ククログ

株式会社クリアコード > ククログ > 実装者向けのApache Arrowフォーマットの説明

実装者向けのApache Arrowフォーマットの説明

Apache Arrowの開発に参加している須藤です。Apache Arrowフォーマットの説明をするときはどうして速いのかやどうして効率がよいのかに注目して説明していました。今回は、Apache Arrowフォーマットのシリアライザー・デシリアライザーを実装する人向けに説明します。

Apache Arrowフォーマットの仕様

Apache Arrowフォーマットの仕様は https://arrow.apache.org/docs/format/Columnar.htmlhttps://github.com/apache/arrow/tree/main/format で提供されています。前者がテキストでの説明で、後者がそこから参照されているFlatBuffersの定義ファイルです。

ここで説明することはこれらを実装者向けに解説したものになります。ここで説明する内容によってこれらの理解を助けるはずですが、最終的にはこれらを参照しながら実装することになります。

Apache Arrowフォーマットデータの概観

ここではApache Arrowフォーマットで表現されたデータをApache Arrowフォーマットデータと呼ぶことにします。

Apache Arrowフォーマットデータは型付きの列指向のテーブルデータを表現しています。

Apache Arrowフォーマットデータ
+----------------+
| 型付き         |
| 列指向         |
| テーブルデータ |
+----------------+

テーブルデータは0個以上の「レコードバッチ」に分割されて表現されます。

Apache Arrowフォーマットデータ
+---------------------------------------------+
| 型付き                                      |
| 列指向                                      |
| テーブルデータ                              |
| +=================+ +=================+     |
| | レコードバッチ1 | | レコードバッチ2 | ... |
| +=================+ +=================+     |
+---------------------------------------------+

レコードバッチの中に各カラムのデータがあります。

Apache Arrowフォーマットデータ
+---------------------------------------------------------------------+
| 型付き                                                              |
| 列指向                                                              |
| テーブルデータ                                                      |
| +=============================+ +=============================+     |
| | レコードバッチ1             | | レコードバッチ2             | ... |
| | +~~~~~~~~~+ +~~~~~~~~~+     | | +~~~~~~~~~+ +~~~~~~~~~+     |     |
| | | カラム1 | | カラム2 | ... | | | カラム1 | | カラム2 | ... |     |
| | +~~~~~~~~~+ +~~~~~~~~~+     | | +~~~~~~~~~+ +~~~~~~~~~+     |     |
| +=============================+ +=============================+     |
+---------------------------------------------------------------------+

すべてのレコードバッチは同じスキーマを持ちます。(スキーマに含まれる辞書型の中身は変わることがありますが、初期実装では変わらないと思ってよいです。)

Apache Arrowフォーマットデータ
+---------------------------------------------------------------------+
| 型付き                                                              |
| 列指向                                                              |
| テーブルデータ         全部同じスキーマ                             |
| +=============================+ +=============================+     |
| | レコードバッチ1             | | レコードバッチ2             | ... |
| | +~~~~~~~~~+ +~~~~~~~~~+     | | +~~~~~~~~~+ +~~~~~~~~~+     |     |
| | | カラム1 | | カラム2 | ... | | | カラム1 | | カラム2 | ... |     |
| | +~~~~~~~~~+ +~~~~~~~~~+     | | +~~~~~~~~~+ +~~~~~~~~~+     |     |
| +=============================+ +=============================+     |
+---------------------------------------------------------------------+

スキーマとレコードバッチはFlatBuffersで表現されていますが、カラムはFlatBuffersの中にApache Arrow独自のレイアウトで表現されています。この独自レイアウトはメモリー上の表現としても使われているやつです。これをゼロコピーで使えることがApache Arrowの速度の優位性になるので、ゼロコピーで使えるように実装することが重要です。

Apache Arrowフォーマットデータ
+---------------------------------------------------------------------+
| 型付き                                                              |
| 列指向                  FlatBuffers                                 |
| テーブルデータ         全部同じスキーマ                             |
| +=============================+ +=============================+     |
| | レコードバッチ1 FlatBuffers | | レコードバッチ2 FlatBuffers | ... |
| | +~~~~~~~~~+ +~~~~~~~~~+     | | +~~~~~~~~~+ +~~~~~~~~~+     |     |
| | | カラム1 | | カラム2 | ... | | | カラム1 | | カラム2 | ... |     |
| | | 独自    | | 独自    |     | | | 独自    | | 独自    |     |     |
| | +~~~~~~~~~+ +~~~~~~~~~+     | | +~~~~~~~~~+ +~~~~~~~~~+     |     |
| +=============================+ +=============================+     |
+---------------------------------------------------------------------+

Apache Arrowフォーマットデータは「ストリーミングフォーマット」または「ファイルフォーマット」としてやりとりされます。

ストリーミングフォーマットは、名前の通り、データ全体を受け取りきる前でも受け取ったデータから順に処理することがことができます。逆に言うと、先頭から順番に処理することしかできません。ランダムアクセスはできません。よって、データ処理パイプラインなどでデータが準備できたらすぐに次の処理に回したいような場合はストリーミングフォーマットの方が適しています。

(↓の図で伝わる?)

producer ------------------------------------> consumer
+--------+ +------+ +------+ +------+ +------+ +------+
| マット | | ォー | | グフ | | ミン | | リー | | スト |
+--------+ +------+ +------+ +------+ +------+ +------+

一方、ファイルフォーマットはデータ全体を受け取った後でしか処理を開始できません。受け取ったあとであれば、先頭から順に処理することもできますし、ランダムアクセスもできます。名前の通り、ファイルに保存して、それをmmap()してゼロコピーで使うのがよくある使い方です。

補足:厳密にはデータ全体を受け取らなくてもデータ内の任意の位置を読み込むことができるなら処理を開始できます。たとえば、HTTPのRangeリクエストを使えば、リモートのファイルフォーマットのApache Arrowデータすべてをダウンロードしなくても処理できます。

(↓の図で伝わる?)

                  +----------------------+
producer write -> | ファイルフォーマット | <- mmap() consumer
                  +----------------------+

Apache Arrowフォーマットデータの概観はつかめましたか?

Apache Arrowフォーマットデータデシリアライザーの実装

概観をつかんだところでどのように実装していくかの話に入りましょう。大きな処理はApache Arrowフォーマットデータのシリアライズ(書き出し)とデシリアライズ(読み込み)になります。どちらから作るのがよいかというとデシリアライズです。

すでに既存の実装がいくつもあるので、既存の実装でシリアライズしたApache Arrowフォーマットデータをデシリアライズするところから実装するのが楽なのです。デシリアライズ処理を実装する中でApache Arrowデータの理解も進むはずなので、その知見も活かしてシリアライズ処理を実装します。

デシリアライズ処理はどのように実装していくとよいかというと、まずはストリーミングフォーマットのサポートからです。

Apache Arrowフォーマットデータはストリーミングフォーマットまたはファイルフォーマットとしてやりとりされるということはすでに説明した通りです。そのため、最終的にこの2つのフォーマットをデシリアライズする必要があります。つまり、入力が2パターンあり、1つがストリーミングフォーマット、もう1つがファイルフォーマットということです。出力はApache Arrowフォーマットデータからゼロコピーで構築されたメモリー上のテーブルデータです。ここでは、メモリー上のテーブルデータのことを「Apache Arrowデータ」と呼ぶことにします。「Apache Arrowフォーマットデータ」と「Apache Arrowデータ」で紛らわしいですが。。。

パターン1:ストリーミングフォーマット→デシリアライザー→Apache Arrowデータ
パターン2:ファイルフォーマット→デシリアライザー→Apache Arrowデータ

実は、ファイルフォーマットはストリーミングフォーマットをラップしたものになっています。そのため、ファイルフォーマットをデシリアライズするときにはストリーミングフォーマットもデシリアライズしないといけないのです。よって、まずはストリーミングフォーマットからサポートしていくのがよいのです。

+--------------------------------+
| ファイルフォーマット           |
| +============================+ |
| | ストリーミングフォーマット | |
| +============================+ |
+--------------------------------+

ストリーミングフォーマットは仕様にある通り、次のような構造になっています。

<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<DICTIONARY x DELTA>
...
<DICTIONARY y DELTA>
...
<RECORD BATCH n - 1>
<EOS [optional]: 0xFFFFFFFF 0x00000000>

<XXX>が1つの論理的なデータです。つまり、次の4種類の論理的なデータがあります。

  • <SCHEMA>
  • <DICTIONARY>
  • <RECORD BATCH>
  • <EOS>

このうち、<EOS>はリテラル(0xFFFFFFFF 0x00000000)で、それ以外はそれぞれ独立した「カプセル化されたメッセージフォーマット("encapsulated" message format)」のデータです。カプセル化されたメッセージフォーマットは次のようになっています。

<continuation: 0xFFFFFFFF>
<metadata_size: int32>
<metadata_flatbuffer: bytes>
<padding>
<message body>

最初の32bit(<continuation>)にリテラルで0xFFFFFFFFがあります。これはこのメッセージがvalidであることを示しています。実は、0xFFFFFFFFは↑のストリーミングフォーマットの<EOS>の最初の32bitでもあります。これは、次の<metadata_size>と一緒に考えるとわかりやすいです。

次の32bit(<metadata_size>)はそのあとの<metadata_flatbuffer><padding>のサイズを示しています。リトルエンディアンで単位はバイトです。ここのサイズが0だと<continuation><metadata_size>0xFFFFFFFF 0x00000000になります。これは<EOS>そのものです。つまり、<metadata_size>が0のカプセル化されたメッセージは<EOS>を表現しているということです。(↑で<EOS>はリテラルでカプセル化されたメッセージではないと説明しましたが、<EOS>も含めてカプセル化されたメッセージとして実装してもよいです。)

次の<metadata_flatbuffer>にはFlatBuffersのメッセージが入っているので、ここのデータをFlatBuffersのデータとして処理します。処理方法は後述しますが、処理するときはそのあとの<padding>も含めてFlatBuffersのデータとして処理しないといけません。

FlatBuffersデータ
+------------------------------+
| <metadata_flatbuffer: bytes> |
| <padding>                    |
+------------------------------+

なぜかというと、<metadata_flatbuffer>だけだと8バイトの倍数のサイズになっていないかもしれないからです。FlatBuffersはデータのサイズは8バイトの倍数になっていることを要求しています。<padding>は8バイトの倍数にするためのものです。これを抜くと不正なFlatBuffersデータになる可能性があります。

そのあとの<message body>はあることもないこともあります。あるかどうかは<metadata_flatbuffer>を処理するとわかります。この中に<message body>のサイズが入っているのです。0の場合は<message body>がないということです。

メタデータのデシリアライズ

それでは<metadata_flatbuffer>のデシリアライズについて説明します。

これはFlatBuffersデータで、使っているスキーマはMessage.fbsです。この中にあるtable MessageがこのFlatBuffersデータです。

table Message {
  version: org.apache.arrow.flatbuf.MetadataVersion;
  header: MessageHeader;
  bodyLength: long;
  custom_metadata: [ KeyValue ];
}

最終的にすべてちゃんと処理することになりますが、初期実装ではheaderbodyLengthだけ処理すれば十分です。bodyLengthは前述の<message body>のサイズなので、<message body>を処理する説明のところで触れます。ここではheaderについて説明します。

headerを見るとこのメッセージの種類がわかります。ストリーミングフォーマットでは<EOS>を除くと次の論理的なデータが使われていると説明しましたが、headerを見るとこのうちのどれかがわかります。

  • <SCHEMA>
  • <DICTIONARY>
  • <RECORD BATCH>

headerの型のMessageHeaderは次のようになっています。

union MessageHeader {
  Schema, DictionaryBatch, RecordBatch, Tensor, SparseTensor
}

FlatBuffersの詳細は別の記事で説明しますが、unionとなっているところが重要です。MessageHeaderの値は次のどれかの型の値であるという意味です。たとえば、Schema型なら<SCHEMA>データでRecordBatch型なら<RECORD BATCH>データということです。

  • Schema
  • DictionaryBatch
  • RecordBatch
  • Tensor
  • SparseTensor

ストリーミングフォーマットの論理的なデータのうち、初期実装で処理するのは<SCHEMA><RECORD BATCH>だけです。<DICTIONARY>は後回しでよいです。<DICTIONARY>は辞書型というちょっと複雑な型をサポートするときに必要になるだけなので、そのときまでは気にしなくてよいです。

<SCHEMA>のデシリアライズ

それでは、まずは<SCHEMA>の処理方法を説明しましょう。<SCHEMA>の情報は<RECORD BATCH>の処理をするときにも必要になるのです。

<SCHEMA>にはスキーマが入っています。これはFlatBuffersだけで表現されています。Schema.fbsで次のように定義されています。

table Schema {
  /// endianness of the buffer
  /// it is Little Endian by default
  /// if endianness doesn't match the underlying system then the vectors need to be converted
  endianness: Endianness=Little;

  fields: [Field];
  // User-defined metadata
  custom_metadata: [ KeyValue ];

  /// Features used in the stream/file.
  features : [ Feature ];
}

初期実装で気にしないといけないのはfieldsだけです。ここにレコードバッチにはどんなカラムがあってそれぞれの型はなにかなどが入っています。

Fieldの定義はこうです。

table Field {
  /// Name is not required (e.g., in a List)
  name: string;

  /// Whether or not this field can contain nulls. Should be true in general.
  nullable: bool;

  /// This is the type of the decoded value if the field is dictionary encoded.
  type: Type;

  /// Present only if the field is dictionary encoded.
  dictionary: DictionaryEncoding;

  /// children apply only to nested data types like Struct, List and Union. For
  /// primitive types children will have length 0.
  children: [ Field ];

  /// User-defined metadata
  custom_metadata: [ KeyValue ];
}

初期実装で処理しないといけないのはnamenullabletypeだけです。それぞれ、カラム名・カラムにnullを入れられるか・カラムの型を表しています。Apache Arrowがサポートしている型には、数値や文字列などのプリミティブな型と他の型を含むリストや構造体と行ったネストされた型があります。初期実装ではプリミティブな型のみをサポートするのがよいです。プリミティブな型を処理するだけならnamenullabletypeだけあれば十分なのです。

Typeの定義はこうです。

union Type {
  Null,
  Int,
  FloatingPoint,
  Binary,
  Utf8,
  Bool,
  Decimal,
  Date,
  Time,
  Timestamp,
  Interval,
  List,
  Struct_,
  Union,
  FixedSizeBinary,
  FixedSizeList,
  Map,
  Duration,
  LargeBinary,
  LargeUtf8,
  LargeList,
  RunEndEncoded,
  BinaryView,
  Utf8View,
  ListView,
  LargeListView,
}

これもMessageHeaderのようにunionになっています。Intなら数値型、Utf8なら文字列型というようになります。このうち、プリミティブ型は次の通りです。初期実装ではこれらをサポートすることを目指します。

  • Null
  • Int
  • FloatingPoint
  • Binary
  • Utf8
  • Bool
  • Decimal
  • Date
  • Time
  • Timestamp
  • Interval
  • FixedSizeBinary

これらの型の中にはパラメーターがあるものとないものがあります。たとえば、Intはパラメーターがあるもので、Utf8はないものです。

Intの定義は次のようになっています。

table Int {
  bitWidth: int; // restricted to 8, 16, 32, and 64 in v1
  is_signed: bool;
}

何bitかと符号のありなしです。Int8/UInt16/...などを定義するのではなく、これらをIntで表現しているのでパラメーターがあります。

Utf8の定義は次のようになっています。

table Utf8 {
}

空っぽですね。

一般的に、パラメーターがない型の方が実装しやすいので、パラメーターがない型から実装するのがよいでしょう。ただ、IntUtf8にくらべてすごく難しいかというとそれほどでもないので、パラメーターがある型から実装するのもよいでしょう。動作確認のしやすさなどのバランスを考えて順序を決めるとよいでしょう。

<SCHEMA>bodyLengthは常に0です。つまり、<message body>はありません。

<RECORD BATCH>のデシリアライズ

<SCHEMA>fieldsでスキーマがわかるので、それをもとに<RECORD BATCH>を処理します。

RecordBatchの定義はMessage.fbsにあります。

table RecordBatch {
  /// number of records / rows. The arrays in the batch should all have this
  /// length
  length: long;

  /// Nodes correspond to the pre-ordered flattened logical schema
  nodes: [FieldNode];

  /// Buffers correspond to the pre-ordered flattened buffer tree
  ///
  /// The number of buffers appended to this list depends on the schema. For
  /// example, most primitive arrays will have 2 buffers, 1 for the validity
  /// bitmap and 1 for the values. For struct arrays, there will only be a
  /// single buffer for the validity (nulls) bitmap
  buffers: [Buffer];

  /// Optional compression of the message body
  compression: BodyCompression;

  /// Some types such as Utf8View are represented using a variable number of buffers.
  /// For each such Field in the pre-ordered flattened logical schema, there will be
  /// an entry in variadicBufferCounts to indicate the number of number of variadic
  /// buffers which belong to that Field in the current RecordBatch.
  ///
  /// For example, the schema
  ///     col1: Struct<alpha: Int32, beta: BinaryView, gamma: Float64>
  ///     col2: Utf8View
  /// contains two Fields with variadic buffers so variadicBufferCounts will have
  /// two entries, the first counting the variadic buffers of `col1.beta` and the
  /// second counting `col2`'s.
  ///
  /// This field may be omitted if and only if the schema contains no Fields with
  /// a variable number of buffers, such as BinaryView and Utf8View.
  variadicBufferCounts: [long];
}

初期実装で処理しないといけないのはlengthnodesbuffersです。compressionはデータ圧縮をサポートするまで後回しにしてよいです。variadicBufferCountsUtf8Viewなどのビュー系の型をサポートするまで後回しにしてよいです。

lengthはレコード数です。

nodesは各カラムの値の数とnullの数を示しています。nodesが使っているFieldNodeは次のようになっています。

struct FieldNode {
  /// The number of value slots in the Arrow array at this level of a nested
  /// tree
  length: long;

  /// The number of observed nulls. Fields with null_count == 0 may choose not
  /// to write their physical validity bitmap out as a materialized buffer,
  /// instead setting the length of the bitmap buffer to 0.
  null_count: long;
}

lengthがカラムの値の数でnull_countnullの数です。

buffersには各カラムの中身そのもの、ではなく、中身がある場所とサイズが入っています。中身自体は<metadata body>にあります。Bufferの定義はSchema.fbsにあります。

struct Buffer {
  /// The relative offset into the shared memory page where the bytes for this
  /// buffer starts
  offset: long;

  /// The absolute length (in bytes) of the memory buffer. The memory is found
  /// from offset (inclusive) to offset + length (non-inclusive). When building
  /// messages using the encapsulated IPC message, padding bytes may be written
  /// after a buffer, but such padding bytes do not need to be accounted for in
  /// the size here.
  length: long;
}

offset<metadata body>内でのオフセットで、lengthがサイズです。どちらも単位はバイトです。

1つのカラムに1つのBufferが紐づいているわけではないことに注意してください。それぞれの型ごとにレイアウトが違います。たとえば、数値は固定長プリミティブレイアウトを使います。

次はInt32型の[1, null, 2, 4, 8]の例です。

* Length: 5, Null count: 1
* Validity bitmap buffer:

  | Byte 0 (validity bitmap) | Bytes 1-63            |
  |--------------------------|-----------------------|
  | 00011101                 | 0 (padding)           |

* Value Buffer:

  | Bytes 0-3   | Bytes 4-7   | Bytes 8-11  | Bytes 12-15 | Bytes 16-19 | Bytes 20-63           |
  |-------------|-------------|-------------|-------------|-------------|-----------------------|
  | 1           | unspecified | 2           | 4           | 8           | unspecified (padding) |

「Validity bitmap buffer」と「Value buffer」で2つのBufferを使っています。このカラムに紐づいている最初のBufferが「Validity bitmap buffer」で、次のBufferが「Value buffer」になります。「Validity bitmap buffer」はnullの要素を示すBufferで、nullがない場合はこのバッファー自体が存在しないこともあります。この場合でもBuffer自体は存在するのですが、lengthは0になっています。

カラムのデシリアライズ

<RECORD BATCH>内にある各カラムは物理メモリーレイアウトに説明されているとおりに処理します。<message body>内にあるデータはここで説明されているレイアウトで配置されているので、それをゼロコピーで切り出して適切な値として参照できるようにします。注意点ですが、中身を処理する必要はありません。参照できるようにするだけで十分です。

たとえば、Rubyでデシリアライザーを実装しているとします。デシリアライズした結果をUInt8Arrayオブジェクトとして表現しましょう。

class UInt8Array
end

このとき、次のようにApache ArrowのメモリーレイアウトのデータをRubyの数値に変換してはいけません。

class UInt8Array
  def initialize(values)
    @values = values
  end
end

def deserialize(input)
  # ...
  value_buffer_flatbuffer = record_batch_message.buffers[...]
  value_buffer = record_batch_message.body[value_buffer_flatbuffers.offset,
                                           value_buffer_flatbuffers.length]
  # Apache ArrowのメモリーレイアウトのUInt8のデータをRubyの数値に変換している!
  UInt8Array.new(value_buffer.unpack("C*"))
end

そうではなくて、Apache Arrowのメモリーレイアウトのバッファーそのものをゼロコピーで参照するだけにします。

class View
  def initialize(buffer, offset, length)
    @buffer = buffer
    @offset = offset
    @length = length
  end
end

class UInt8Array
  def initialize(values)
    @values = values
  end
end

def deserialize(input)
  # ...
  value_buffer_flatbuffer = record_batch_message.buffers[...]
  value_buffer = View.new(record_batch_message.body,
                          value_buffer_flatbuffers.offset,
                          value_buffer_flatbuffers.length)
  UInt8Array.new(value_buffer) # Apache Arrowのメモリーレイアウトのまま参照
end

アプリケーションがこの値をRubyで処理したくなったらそのときにはじめてRubyの数値に変換します。ただ、それは遅いので、基本的にはApache Arrowのメモリーレイアウトのまま処理する方法を使うのがベターです。たとえば、Numo::NArrayを使えばそのようなことができます。

いずれにしても、デシリアライザーがやることではありません。次のようにRubyの数値に変換するような便利メソッドを定義してもよいですが、デシリアライズするタイミングで使うべきではなく、アプリケーションが必要になった時点で呼び出すくらいのものです。

class View
  def initialize(buffer, offset, length)
    @buffer = buffer
    @offset = offset
    @length = length
  end

  def unpack(format)
    @buffer.unpack(format, offset: @offset)
  end
end

class UInt8Array
  def initialize(values)
    @values = values
  end

  def to_a
    @values.unpack("C*")
  end
end

各プリミティブ型のデシリアライズの方法についてはこの記事ではこれ以上説明しません。

ネストされた型についてもこの記事では説明しません。

Apache Arrowフォーマットデータシリアライザーの実装

それではシリアライザーの実装も説明しましょう、と言いたいところですが、だいぶ長くなってしまったので別の記事にします。

まとめ

Apache Arrowフォーマットのシリアライザー・デシリアライザーを実装したい人向けにApache Arrowフォーマットの概観とデシリアライザーの初期実装に役立つ情報をまとめました。シリアライザーの初期実装およびデシリアライザー・シリアライザーのその後の実装についてはまた今度。。。あ、ファイルフォーマットのデシリアライザーの説明もしていないですね。それもまた今度。。。

Apache Arrowに詳しい私にApache Arrow関連のサポートを頼みたいという場合はクリアコードのApache Arrowサービスをどうぞ。