本記事ではgRPCのServer streaming RPC でのフロー制御をサンプルコードを交えながら解説します。
gRPCとは
gRPCはGoogleによって開発されたRPCプロトコルで、通信にHTTP2上でProtocol Buffersによるシリアライゼーションを利用することで、高性能(軽量、高速)なメッセージングを可能としています。
以下の表では、現在主流である3つのメッセージ交換仕様を比較しています。
方式 | スキーマ記述言語(IDL) | プロトコル | シリアライゼーション | 特徴 |
---|---|---|---|---|
gRPC | Protocol Buffers | HTTP2 | Protocol Buffers | ハイパフォーマンスでマイクロサービス間通信に最適 |
REST | OpenAPI(Swagger) | HTTP | JSON | デファクトスタンダードであるRESTの延長線上なため使いやすい |
GraphQL | GraphQL | HTTP | JSON | フロントエンドとの相性が良い |
※3つの方式ともに、シリアライゼーションには他の仕様を選択することも可能ですが、一般的なものを記載しています
いずれかの方式に統一することもできますが、大規模なシステムに於いては、マイクロサービス間通信にgRPC、フロントエンドとの通信にGraphQL、外部公開するAPIにOpenAPIといったように、ユースケースに応じて使い分けるのも良いと思います。
gRPCを利用してみた感想ですが、パフォーマンスもさることながら、OpenAPIと比べて仕様が単純な分、IDLを元に生成される雛形(スタブ)の品質が高く複数言語での利用をスムーズに行うことができそうだと感じました。
gRPCの呼び出しスタイル
gRPCの呼び出しスタイルには、単項RPCと3つ(サーバ/クライアント/双方向)のストリーミングRPCがあります。
詳しくはRPC life cycleをご覧ください。
本記事ではServer streaming RPCについてフォーカスします。
サーバーストリーミングとフロー制御
gRPCの定義から生成されるスタブは、基本的にノンブブロッキングなプログラミングスタイルで利用されるように設計されています。
Server streaming RPCでストリームにメッセージを流し続けた場合、送信側が受信側のスループットを超えて全力でメッセージを流し続けると、通信用のバッファを圧迫していくことになります。
この問題を回避するために、データ転送時に送信側が受信側に合わせて転送レートをコントロールする、フロー制御を行う必要があります。
ブロッキングスタイルであればバッファが溢れそうになるとブロックしてメモリが空くまで待つ、という実装をすればRPCの実装者にフロー制御を意識させないようにできそうです。
しかし、(シングルスレッドの)ノンブロッキングスタイルの場合、ブロック=全処理停止となるためそうはいきません。
gRPCでフロー制御を行うにはどうすればよいか調べてみました。
サンプル実装
フロー制御のサンプルとして、gRPCでLinuxのyesコマンドっぽいものを実装しました。
実装にSpringBootとKotlinを利用しており、全ソースはGitHubで公開しています。
無限ループで全力送信
手始めに、受信側を考慮せず全力無限ループでメッセージを送信し続ける yes メソッドを作成しました。
1 2 3 4 5 6 |
override fun yes(request: Yes.YesRequest, responseObserver: StreamObserver<Yes.YesReply>) { val reply = Yes.YesReply.newBuilder().setMessage(request.messageWithDefault).build() while (true) { responseObserver.onNext(reply) } } |
案の定、呼び出すとすぐにバッファ(NettyのDirectMemory)が溢れます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Exception in thread "grpc-default-executor-1" io.grpc.netty.shaded.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1778385175, max: 1791492096) at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:656) at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:611) at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:768) at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:744) at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245) at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:215) at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:147) at io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:327) at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123) at io.grpc.netty.shaded.io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51) at io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:226) at io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:167) at io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:140) ...snip... |
余談ですが、JavaでのgRPCでよく利用されるNettyはネットワーク処理にDirectBufferを使うため、メモリ管理はヒープ以外にもダイレクトメモリ(XX:MaxDirectMemorySizeなど)を意識する必要があります。
フロー制御版
gRPCには自動フロー制御機構が備わっており、意識したプログラムをすればバックプレッシャーが機能します。
このissueに記載されていたサンプルコード通りに実装したyesWithFcメソッドがこちら。
1 2 3 4 5 6 7 8 9 10 11 |
override fun yesWithFc(request: Yes.YesRequest, responseObserver: StreamObserver<Yes.YesReply>) { require(responseObserver is ServerCallStreamObserver<Yes.YesReply>) { "response observer is not ServerCallStreamObserver" } val reply = Yes.YesReply.newBuilder().setMessage(request.messageWithDefault).build() val drain = { while (responseObserver.isReady()) { responseObserver.onNext(reply) } } responseObserver.setOnReadyHandler(drain) drain() } |
フロー制御の要点は以下になります。
- ServerCallStreamObserverのonReadyハンドラに送信ロジックを登録する
- responseObserverがisReadyであればデータを送信する
- responseObserverがisReadyでなければ一旦抜ける、続きは次回のonReadyハンドラが呼ばれた際に送る
- 初回にdrainを直接呼び出している(本家のサンプルコードもこうなっていた)のは、ハンドラをセットするタイミングの問題でonReadyHandlerが呼ばれなかった場合の保険?
呼び出したまま一晩放置してみたのですが、翌朝も元気にメッセージを流し続けていました。
サンプル実装ではループ内の処理にIO処理などが無いため、単純な実装で済んでいますが、ブロックキング処理やイベントハンドラを使ったノンブロッキングIOなどが絡む場合は少し工夫が必要になるでしょう。
まとめ
gRPCのサーバーストリーミングでは、何も考えずにメッセージを流し続けるとバッファが溢れてしまうので、防止するためにはフロー制御を意識した実装が必要なので気をつけましょう。