株式会社クリアコード > ククログ

ククログ


Kafka ConnectのConnector開発入門

kafka-connect-fluentdを開発したので、その際に得た知見をまとめます。

Kafka Connectとは、簡単に説明するとKafka Consumersまたは、Kafka Producersの一種です。

あるデータソースからKafkaにデータを投入(Kafka Producers)したり、Kafkaから取り出したデータを別のところに流し(Kafka Consumers)たりするときにKafka ConsumersやKafka Producersでは、定型的なコードをたくさん書く必要がありました。 それを汎用化して、データを取り扱う箇所のコードのみを書けばいいようにまとめたものかKafka Connect APIです。

Connector Developer Guideを読めば、Kafka Connectorを開発する方法が書いてあります。 しかし、プロジェクトの初期設定は面倒なものです。定型コードを自動生成できるコマンドを使うと開発を始めるのが楽になります。

$ mvn -e archetype:generate -B -DarchetypeGroupId=io.confluent.maven \
      -DarchetypeArtifactId=kafka-connect-quickstart \
      -DarchetypeVersion=0.10.0.0 \
      -Dpackage=org.fluentd.kafka \
      -DgroupId=org.fluentd.kafka \
      -DartifactId=kafka-connect-fluentd \
      -Dversion=0.0.1

参考: https://github.com/jcustenborder/kafka-connect-archtype

@jcustenborderさんは、Confluentに所属し、多くのKafka Connectorを公開している方です。

これでpom.xmlやソースコードの雛形などが一通り生成されるので、開発を始めることができます。

kafka-connect-fluentdの場合は、自動生成されたpom.xmlをbuild.gradleに変換して利用しています。

Kafka Connector Source

Kafka Producerに対応するConnectorがSourceです。あるデータソースからKafkaにデータを書き込むために使用します。

kafka-connect-fluentd の場合は、Fluentdのout_forwardからkafka-connect-fluentdのSourceにデータを投げてKafkaにデータを流します。

FluentdSourceConnector

Sourceを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。

  • FluentdSourceConnector (SourceConnectorを継承)
  • FluentdSourceConnectorConfig (AbstractConfigを継承)
  • FluentdSourceTask (SourceTaskを継承)
FluentdSourceConnector

SourceConnectorを継承し、必要なメソッドを実装します。

  • public String version()
    • バージョン文字列を返します
  • public List<Map<String, String>> taskConfigs(int taskMax)
    • マルチスレッドに対応させたい場合は、taskMaxの値に応じて返り値のListMap<String,String>を詰め込みます
    • 設定ファイル(properties)に指定した値をそのままセットします
  • public void start(Map<String, String> properties)
    • Connector開始時にやることを書きます
  • public Class<? extends Task> taskClass()
    • taskクラス (FluentdSourceTaks.class) を返します
  • public void stop()
    • Connectorの終了時にやることを書きます
  • public ConfigDef config()
    • 設定の定義(FluentdSourceConnectorConfig.conf())を返します
FluentdSourceConnectorConfig

public static ConfigDef conf()ConfigDefのAPIを使って設定を定義します。

    public static ConfigDef conf() {
        return new ConfigDef()
                .define(FLUENTD_PORT, Type.INT, 24224, Importance.HIGH,
                        "Port number to listen. Default: 24224")
                .define(FLUENTD_BIND, Type.STRING, "0.0.0.0", Importance.HIGH,
                        "Bind address to listen. Default: 0.0.0.0");
    }

define()は引数が5つあり、前から順に名前、型、デフォルト値、重要度、説明です。引数の数が異なるdefine()もありますが、詳細はjavadocを参照してください。

FluentdSourceTask

Sourceを実装する際に最も重要なクラスです。以下のメソッドを実装します。

  • public void start(Map<String, String> properties)
    • poll()から定期的にアクセスされる共通のリソースを準備します。daemonやスレッドが必要な場合は、ここで起動します。
  • public List<SourceRecord> poll()
    • Kafka Connectのフレームワーク側から定期的に呼ばれます。FluentdSourceTaskではキューに溜めたデータをList<SourceRecord>に詰めて返します。
  • public void stop()
    • このタスクを止めるときに、必要な処理があれば書きます。start で起動したdaemonやスレッドをここで停止します。
  • public String version()
    • バージョン文字列を返します。定型のコードなので詳細は省略します。

このクラスは複数のスレッドから使用される可能性があります。特にpoll()でアクセスするインスタンス変数などについてはマルチスレッドを意識したコードを書く必要があります。

Kafka Connector Sink

Kafka Consumerに対応するConnectorがSinkです。Kafkaから取り出したデータをどこかに書き込みます。

kafka-connect-fluentd の場合は、Kafkaからデータを取り出してkafka-connect-fluentdのSinkからFluentdのin_forwardへデータを流します。

FluentdSinkConnector

Sinkを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。

  • FluentdSinkConnector (SinkConnectorを継承)
  • FluentdSinkConnectorConfig (AbstractConfigを継承)
  • FluentdSinkTask (SinkTaskを継承)
FluentdSinkConnector

SinkConnectorを継承し必要なメソッドを実装します。

  • public String version()
    • バージョン文字列を返します
  • public Class<? extends Task> taskClass()
    • taskクラス(FluentdSinkTask.class)を返します
  • public List<Map<String, String>> taskConfigs(int maxTasks)
    • マルチスレッドに対応させたい場合は、taskMaxの値に応じて返り値のListMap<String,String>を詰め込みます
    • 設定ファイル(properties)に指定した値をそのままセットします
  • public void stop()
    • Connectorの終了時にやることがあれば書きます
  • public ConfigDef config()
    • 設定の定義('FluentdSinkConnectorConfig.conf()')を返します
FluentdSinkConnectorConfig

FluentdSourceConnectorConfigと同じようにpublic static ConfigDef conf()ConfigDefのAPIを使って設定を定義します。

FluentdSinkTask

Sinkを実装する際に最も重要なクラスです。以下のメソッドを実装します。

  • public String version()
    • バージョン文字列を返します。定型のコードなので詳細は省略します。
  • public void start(Map<String, String> properties)
    • このタスクを初期化します。FluentdSinkTaskではFluencyのインスタンスを作成しています。
  • public void put(Collection<SinkRecord> collection)
    • Kafkaから定期的に呼ばれます。FluentdSinkTaskではFluencyでデータをFluentdに送信しています。
  • public void flush(Map<TopicPartition, OffsetAndMetadata> map)
    • これをきちんと実装すれば exactly-once に対応できるらしいです。しかし Fluentd は exactly-once に対応していないので、今のところ単にFluencyのバッファをflushするだけにしています。
  • public void stop()
    • タスクの停止にやることがあれば、中身を実装します。FluentdSinkTaskではFluencyが全てのバッファをflushするのを待ちます。
スキーマについて

SchemaBuilderStructを利用すれば任意のデータ構造をスキーマにマッピングできるのですが、MessagePackとKafka ConnectのSchemaで表現できる型に違いがあるようです。 サポートしている型の違いついて理解を深め、差異をうまく吸収できるような実装をしたいと考えています。

パッケージング

GradleでMaven Centralにライブラリを公開する - たごもりすメモを参考にしてGradleでやりました。

依存関係にKafkaが入っているけど、配布するjarにはKafka関係のファイルを入れてはいけないことに注意が必要でした。

まとめ

思っていたよりも簡単に実装できたのでKafkaにデータを投入したり、Kafkaからデータを取り込みたいのだけどProducer/Consumerはちょっと難しいという場合にKafka Connectを検討してみるとよいのではないでしょうか。

タグ: Fluentd
2018-02-26

«前の記事: Rubyとクリアコード #ruby25th 最新記事 次の記事: Fluentd用のベンチマークツールを開発しました»
タグ:
年・日ごとに見る
2008|05|06|07|08|09|10|11|12|
2009|01|02|03|04|05|06|07|08|09|10|11|12|
2010|01|02|03|04|05|06|07|08|09|10|11|12|
2011|01|02|03|04|05|06|07|08|09|10|11|12|
2012|01|02|03|04|05|06|07|08|09|10|11|12|
2013|01|02|03|04|05|06|07|08|09|10|11|12|
2014|01|02|03|04|05|06|07|08|09|10|11|12|
2015|01|02|03|04|05|06|07|08|09|10|11|12|
2016|01|02|03|04|05|06|07|08|09|10|11|12|
2017|01|02|03|04|05|06|07|08|09|10|11|12|
2018|01|02|03|04|05|06|07|08|09|10|11|12|
2019|01|02|03|04|05|06|07|08|09|10|11|