Presto コネクターを実装する 第三回

こんにちは。松本です。

これは TECHSCORE Advent Calendar 2015 の21日目の記事です。

前回から随分と時間が空いてしまいましたね。スズキ編集長に「まだか」と言われつつも、完全に仕事にかまけてさぼってしまいました。ごめんなさい。携わっていたプロジェクト(これとかこれ)が楽しくて仕方がなかったのです・・(本当に)。

さて、Presto のバージョンは既に 0.131 まで上がったようですが、本記事では引き続き 0.96 でのコネクター開発についてご紹介していきます。

今回はレコード操作まわりです。間が空き過ぎて前回までの内容を忘れたという方はこちらを参照下さい。

Presto コネクターを実装する 第一回
Presto コネクターを実装する 第二回

処理の分割

Presto は、ユーザーからの問い合わせをテーブル単位でコネクターに渡します。コネクターはこの問い合わせに対する処理を ConnectorSplitManager インタフェースを使い、任意に分割することができます。

8.1. SPI Overview - Presto 0.96 Documentation

The split manager partitions the data for a table into the individual chunks that Presto will distribute to workers for processing.

例えば、問い合わせ先となるテーブルのデータが、実際には複数のファイルに分割されて保存されている場合、ファイル毎に処理を分割し、それを並列処理させることで効率よく問い合わせを処理することが可能になります。Presto コネクターの実装例である Example HTTP Connector がこの例です。

次に紹介する 3 つのインタフェースがこれらの役割を担います。

ConnectorPartition はデータソースのパーティションを表すインタフェースです。

ConnectorSplit はパーティションに対するアクセス処理単位を表します。後述する RecordCursor はこの単位で生成されます。

ConnectorSplitManager は、問い合わせに対し ConnectorPartition への分割と、ConnectorSplit への分割を行います。

尚、今回のサンプルコードでは、サンプルとしてのシンプルさを優先し、処理の分割には対応していません。問い合わせに対し常に ConnectorSplit ひとつで処理を行よう実装しています。

com.techscore.example.presto.plugin.TechscorePartition

ここで実装する事になるのは partitionId プロパティと tupleDomain プロパティの Getter メソッドです。

partitionId プロパティは対象テーブルに対して一意になるよう割り当てたパーティションの ID です。サンプルコードはパーティションがひとつなのでパーティション毎に ID を割り当てる必要がなく、スキーマ名とテーブル名のみを結合した文字列を ID として使用しています。Getter メソッドの定義は lombok の @Value アノテーションによって自動生成されます。

tupleDomain はこのパーティションが受け持つデータの範囲を表します。サンプルコードではパーティションがひとつなので、常に「全て」を表す TupleDomain を返しています。

com.techscore.example.presto.plugin.TechscoreSplit

本クラスも前回の ConnectorTableHandle や ConnectorColumn 同様、@JsonCreator や @JsonProperty アノテーションを使って JSON でのシリアライズ/デシリアライズを可能とします。

address プロパティ、remotelyAccessible プロパティはデータソースへのアクセス方法のヒントになる情報ですが、サンプルコードであるため、それぞれ Getter メソッドが固定値を返すよう実装しています。

com.techscore.example.presto.plugin.TechscoreSplitManager

getPartitions() メソッドは問い合わせ内容をもとに ConnectorPartition を作成します。サンプルでは問い合わせの内容に関係なく、単一の TechscorePartition を生成しています。

ここで、引数 tupleDomain が問い合わせ条件を保持しています。tupleDomain がどのように条件を保持しているかは、次の実行結果を見るとイメージし易いでしょう。この実行結果は、Presto に対してクエリを実行した際の、引数 tupleDomain の内容を出力したものです。

select * from techscore.schema1.authors;

select * from techscore.schema1.authors where id = 1;

select * from techscore.schema1.authors where id in (1, 2);

select * from techscore.schema1.authors where id >= 1;

getPartitionSplits() メソッドは、getPartitions() で作成したパーティション情報をもとに ConnectorSplit を生成します。サンプルでは常に単一の TechscoreSplit を生成しています。

データアクセス

前述の通り Presto コネクターは ConnectorSplitManager で生成された ConnectorSplit 単位でデータソースにアクセスし、Presto にデータを返します。

この役割を担うのが、次のインタフェースです。

RecordCursor は JDBC の ResultSet のようなもので、問い合わせ結果に対し、行を進めながらフィールドデータにアクセスするインタフェースを提供します。

RecordSet は RecordCursor を生成する役割を担います。

ConnectorRecordSetProvider は ConnectorSplit 単位で RecordSet を生成する役割を担います。

尚、RecordSet、ConnectorRecordSetProvider をそれぞれ実装した TechscoreRecordSet と TechscoreRecordSetProvider クラスについては、それぞれ TechscoreRecordCursor のインスタンス、TechscoreRecordSet のインスタンスを生成しているだけなので説明を省き、ソースコードだけを掲載します。

com.techscore.example.presto.plugin.TechscoreRecordCursor

本実装では、コンストラクタに問い合わせ結果となるデータが渡される実装です。

getTotalBytes(), getCompletedBytes(), getReadTimeNanos() メソッドは各種統計情報に利用する数値を返します。サンプルコードでは実装を省略し、常に 0 を返すようにしています。

getType() メソッドは指定フィールドのデータ型を返します。

advanceNextPosition() メソッドが行を進める役割を持ちます。

getBoolean(), getLong(), getDouble(), getSlice() メソッドがカレント行のフィールドデータにアクセスする役割を担います。

Presto コネクターでは、データを次の 4 つの Java データ型で表現します。第二回でもご紹介した基本的なデータ型とのマッピングは次のようになります。

ここで、TimestampType.TIMESTAMP は日時を 1970-01-01T00:00:00 UTC からの経過ミリ秒として扱い、DateType.DATE は日付を 1970-01-01 からの経過日数として扱います。

VarcharType.VARCHAR は Slice を使います。今回は Slices.utf8Slice(String) メソッドで String から Slice への変換を行っています。

RecordCursor には Presto のバージョン 0.113 から getObject(int) という Object を返すメソッドが追加されています。ARRAY 型や MAP 型等はこのメソッドで扱う仕様に変更されています。

isNull() はカレント行の指定フィールドデータが null であることを検証するメソッドです。

com.techscore.example.presto.plugin.TechscoreRecordSet

com.techscore.example.presto.plugin.TechscoreRecordSetProvider

動作確認

デプロイ方法は、ビルドした JAR ファイル、および依存する JAR ファイルを所定のディレクトリに放り込むだけです。今回のサンプルでは Presto インストールディレクトリ直下の plugin ディレクトリに techscore というディレクトリを作成し、そこにビルド結果となる techscore-presto-plugin-1.0.jar を入れるのみです。デプロイ後は Presto を再起動する必要があります。

起動完了後、Presto のコマンドラインツールを使い、動作確認を行います。

最後に

来年は心を入れ替え、仕事はほどほどにしてブログに力を入れます!編集長!

Comments are closed, but you can leave a trackback: Trackback URL.

Advent Calendar 2015の連載記事

  1. TECHSCORE Advent Calendar 2015
  2. Redshift と PostgreSQL に同時に JDBC 接続する
  3. Lombok で Spice up your Java!
  4. 画像を指定するだけ!非デザイナーでも簡単にそれっぽい配色ができるツールを作ってみた
  5. 新卒文系エンジニアの記録:配属半年間の失敗を振り返ってみた
  6. 非同期処理のすすめ
  7. ioDrive2の導入で支える、そのIOPS - 導入検討編.
  8. GoでパイプラインからSlackに通知する
  9. fuse でオレオレファイルシステムを作ってみた (Haskell で)
  10. Erlang はじめました
  11. ちょっと地味なビルドとリリースの話 (レガシーシステム改革、はじめの一歩)
  12. Java8 最速 boolean[] to Stream 選手権
  13. Google Apps の Directory API にてWebブラウザを介さずに認証する
  14. 風データをビジュアルに表現する
  15. マイクロフレームワーク「Ninja」を使ってみる
  16. 赤ちゃんvimmerからよちよちvimmerにクラスチェンジを果たすためのTips
  17. PostgreSQL FDW を作ってSQLでログ検索してみた
  18. Goで偽名ジェネレータを作りました
  19. 書き込み中に削除されたファイルを救出する
  20. 運用情報更新のススメ
  21. ちゃんと読んでくれましたか?
  22. Presto コネクターを実装する 第三回
  23. Ruby2.3を触ってみる
  24. Git 困ったときのtips集
  25. 5分で読む入門編:Java 8 ラムダ式 コレクション編(2)リストの検索
  26. CloudFront (+ S3) + JWPLAYER で様々なデバイスのブラウザから動画をストリーミング再生する