blog.syfm

徒然なるままに考えていることなどを書いていくブログ

gRPC のトランスポートを任意の実装に差し替えられる grpchan を試す

GitHub を眺めていたら protoreflect や gRPCurl で有名な jhump 氏が fullstorydev org 配下に面白いリポジトリを公開していることに気づいた。

github.com

grpchan は gRPC Channel の抽象を定義・提供し、HTTP/2 ではなくインメモリや HTTP/1.1 などのトランスポートを使うことを可能にしている。 grpchan のドキュメント に具体的な使い方も含めて詳細が書かれている。

Hello, world

実際に grpchan を試してみる。grpchan はライブラリと Protocol Buffers プラグイン (protoc-gen-grpchan) に分かれており、gRPC プラグイン (protoc-gen-go-grpc) で自動生成されたコードではなく protoc-gen-grpchan によって自動生成されたコードを利用することで Channel を差し替えることができる。

$ protoc --grpchan_out helloworld --proto_path helloworld --go_out=plugins=grpc:helloworld helloworld/helloworld.proto

生成されたコードは量もなく理解しやすい。
protoc-gen-go-grpc で生成されたコードでは *grpc.ClientConnInvoke メソッドを内部的に叩いているが、protoc-gen-grpchan では grpchan.Channel というインターフェースに差し替わっているところが大きな違いで、これを NewGreeterChannelClient から DI できるようになっている。

// Code generated by protoc-gen-grpchan. DO NOT EDIT.
// source: helloworld.proto

package helloworld

import "github.com/fullstorydev/grpchan"
import "golang.org/x/net/context"
import "google.golang.org/grpc"

func RegisterHandlerGreeter(reg grpchan.ServiceRegistry, srv GreeterServer) {
    reg.RegisterService(&_Greeter_serviceDesc, srv)
}

type greeterChannelClient struct {
    ch grpchan.Channel
}

func NewGreeterChannelClient(ch grpchan.Channel) GreeterClient {
    return &greeterChannelClient{ch: ch}
}

func (c *greeterChannelClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    err := c.ch.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

クライアントは先程の NewGreeterChannelClient を使って初期化する。ここでは grpchan.Channel の HTTP/1.1 実装の httpgrpc.Channel を利用する。

package main

import (
    "context"
    "fmt"
    "net/http"
    "net/url"

    "github.com/fullstorydev/grpchan/httpgrpc"
    "github.com/ktr0731/grpchan-playground/helloworld"
)

func main() {
    u, err := url.Parse("http://127.0.0.1:50051")
    if err != nil {
        panic(err)
    }

    client := helloworld.NewGreeterChannelClient(
        &httpgrpc.Channel{
            Transport: http.DefaultTransport,
            BaseURL:   u,
        },
    )
    res, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
        Name: "ktr",
    })
    if err != nil {
        panic(err)
    }
    fmt.Println(res.Message)
}

サーバは RegisterHandlerGreeter を使ってサービスを登録し、通常の HTTP サーバのように http.ListenAndService を使って起動する。

package main

import (
    "context"
    "fmt"
    "net/http"

    "github.com/fullstorydev/grpchan"
    "github.com/fullstorydev/grpchan/httpgrpc"
    "github.com/ktr0731/grpchan-playground/helloworld"
    "google.golang.org/grpc"
)

func main() {
    reg := grpchan.HandlerMap{}
    helloworld.RegisterHandlerGreeter(&reg, &server{})

    srv := grpc.NewServer()
    reg.ForEach(srv.RegisterService)

    httpgrpc.HandleServices(http.HandleFunc, "/", reg, nil, nil)

    http.ListenAndServe(":50051", nil)
}

type server struct{}

func (s *server) SayHello(ctx context.Context, req *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    return &helloworld.HelloReply{Message: fmt.Sprintf("hello, %s", req.Name)}, nil
}

サーバを起動し、クライアントを実行するとレスポンスがちゃんと返ってくる。

$ go run client/main.go
hello, ktr

curl で叩いてみてもちゃんと動作した。拙作の pb を使って入出力のエンコード・デコードを行った。

$ in="$(echo '{"name": "ktr"}' | pb -F helloworld/helloworld.proto encode helloworld.HelloRequest)"
$ curl -s -d "$in" -X POST -H 'Content-Type: application/x-protobuf' http://127.0.0.1:50051/helloworld.Greeter/SayHello | pb -F helloworld/helloworld.proto decode helloworld.HelloReply
{
  "message": "hello, ktr"
}

vs gRPC

HTTP/2 を利用した gRPC と比較すると、トランスポートに依存する点を中心として以下のような違いがある。ちゃんと考えてないのでもっとあるはず。

  • grpc.DialOption が使えない
  • grpc.StatsHandler が使えない (未実装)
  • gRPC リフレクションが使えない (未実装)
  • metadata や grpc-statusgrpc-status-details-bingrpc-timeout のような header/trailer に依存する部分が独自実装

また、HTTP/1.1 を実装として利用する場合、以下のような制約がある。これは gRPC-Web の制約と同様。

  • Bidirectional Streaming が限定的なサポート (レスポンスが返ったあとに再度リクエストを投げることができない)

vs improbable-eng/grpc-web

improbable-eng/grpc-web (以下 gRPC-Web) と比較すると以下のような違いがある。

  • gRPC-Web は gRPC サーバを HTTP サーバでラップし、HTTP → gRPC への変換を行うので gRPC サーバへ独自のメカニズムを導入しないのに対し、grpchan はトランスポートを入れ替えている
  • grpchan は特定のトランスポートのみを使用するので、gRPC-Web のように HTTP/1.1 と HTTP/2 を同時にサポートすることができない
  • gRPC-Web は gRPC Status をレスポンスボディに格納するのに対し、grpchan は X-GRPC-Status ヘッダに格納する
    • 同様に Status Details や Trailer もレスポンスボディではなく独自のヘッダに格納する

より詳しい違いはドキュメントに記述されているので読んでみると良さそう。

pkg.go.dev

gRPC リフレクションはなにをしているか?

gRPC リフレクションは、対象の gRPC サーバがどのようなサービス、メソッドを公開しているかを知るための機能です。

gRPC を使う上でリフレクションを有効にすると、gRPCurl や Evans といったツールを使う際に Protocol Buffers などの IDL を直接的に読み込まずにメソッドを呼び出すことができてとても便利ですが、gRPC リフレクションはなにをしていて、ツールは gRPC リフレクションをどうやって使っているのでしょうか?
それなりに複雑でしばしば忘れるので記事にしておこうと思います。なお、この記事では gRPC の実装として grpc/grpc-go、Protocol Buffers の実装として protocolbuffers/protobuf-go を参照します。

gRPC リフレクションを有効化しているサーバの例

grpc/grpc-go に gRPC リフレクションの例があるのでそこから抜粋します。 このサーバには GreeterServerEchoServer という gRPC サービスがあり、GreeterServer には SayHello というメソッドがあります。

gRPC リフレクションを使っていないサーバとの差異は reflection.Register(s) を呼ぶか呼ばないかという点しかありません。

func main() {
    flag.Parse()
    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    fmt.Printf("server listening at %v\n", lis.Addr())

    s := grpc.NewServer()

    // Register Greeter on the server.
    hwpb.RegisterGreeterServer(s, &hwServer{})

    // Register RouteGuide on the same server.
    ecpb.RegisterEchoServer(s, &ecServer{})

    // Register reflection service on gRPC server.
    reflection.Register(s)

    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

reflection.Register は以下のように、gRPC サーバに ServerReflectionServer を登録している処理しかありません。これは上記の RegisterGreeterServerRegisterEchoServer と同じような gRPC サービスの登録処理です。つまり、gRPC リフレクション自体も gRPC サービスとして実装されているということになります。

func Register(s *grpc.Server) {
    rpb.RegisterServerReflectionServer(s, &serverReflectionServer{
        s: s,
    })
}

gRPC リフレクションの API 定義

gRPC リフレクションの API 定義は grpc/grpc に置かれています。

grpc/reflection.proto at master · grpc/grpc · GitHub

コメント等を削除した定義は以下のようになっています。

syntax = "proto3";

package grpc.reflection.v1alpha;

service ServerReflection {
  rpc ServerReflectionInfo(stream ServerReflectionRequest) returns (stream ServerReflectionResponse);
}

message ServerReflectionRequest {
  string host = 1;
  // To use reflection service, the client should set one of the following
  // fields in message_request. The server distinguishes requests by their
  // defined field and then handles them using corresponding methods.
  oneof message_request {
    string file_by_filename = 3;

    string file_containing_symbol = 4;

    ExtensionRequest file_containing_extension = 5;

    string all_extension_numbers_of_type = 6;

    string list_services = 7;
  }
}

message ExtensionRequest {
  string containing_type = 1;
  int32 extension_number = 2;
}

message ServerReflectionResponse {
  string valid_host = 1;
  ServerReflectionRequest original_request = 2;
  oneof message_response {
    FileDescriptorResponse file_descriptor_response = 4;

    ExtensionNumberResponse all_extension_numbers_response = 5;

    ListServiceResponse list_services_response = 6;

    ErrorResponse error_response = 7;
  }
}

message FileDescriptorResponse {
  repeated bytes file_descriptor_proto = 1;
}

message ExtensionNumberResponse {
  string base_type_name = 1;
  repeated int32 extension_number = 2;
}

message ListServiceResponse {
  repeated ServiceResponse service = 1;
}

message ServiceResponse {
  string name = 1;
}

message ErrorResponse {
  int32 error_code = 1;
  string error_message = 2;
}

ServerReflectionInfo という Bidirectional Streaming なメソッドがあり、message_request で指定されたフィールドに対応してレスポンスが返されます。 gRPC クライアントはこの ServerReflectionInfo から欲しい情報をリクエストし、メソッドを呼び出します。

メソッドを呼び出すために必要な情報を取得する

例えば SayHello を呼び出すことを考えてみましょう。

まず、SayHello が属しているサービスを知る必要があります。そのためにはまず gRPC サーバが公開しているサービスの一覧を取得しなければいけません。これには list_services フィールドを使用します。
Evans で実際に gRPC リフレクションサービスの ServerReflectionRequest を叩いてみます。

f:id:ktr_0731:20200621200850p:plain

このように、grpc.examples.echo.Echogrpc.reflection.v1alpha.ServerReflectionhelloworld.Greeter が公開されていることが分かりました。

次に、これらのサービス名からメソッドを取得する必要があります。これには file_containing_symbol を使用します。

f:id:ktr_0731:20200621201500p:plain

Base64エンコードされた FileDescriptor が返ってきました。descriptor とは Protocol Buffers のシンボルをエンコード・デコードするために必要なメタデータの集合のことで、FileDescriptor はその名前の通り、Protocol Buffers の定義が記述されたファイルの descriptor です。今回の場合だと helloworld.Greeter が定義されているファイルである helloworld.proto がソースとなっています。

FileDescriptor 自体も Protocol Buffers で定義されたメッセージ型で、Go だと FileDescriptorProto という型で自動生成されています。FileDescriptorProto のフィールドには ServiceDescriptorProto のスライスがあり、このファイルに定義されているサービスの descriptor が含まれていることを意味しています。さらに、ServiceDescriptorProto のフィールドには MethodDescriptorProto があり、SayHello も一つの MethodDescriptorProto として表現されています。

以下のようなコードを書けば descriptor をデコードできます。

package main

import (
    "encoding/base64"
    "fmt"
    "log"

    "github.com/golang/protobuf/protoc-gen-go/descriptor"
    "google.golang.org/protobuf/proto"
)

func main() {
    var out []byte
    in := "Ci9leGFtcGxlcy9oZWxsb3dvcmxkL2hlbGxvd29ybGQvaGVsbG93b3JsZC5wcm90bxIKaGVsbG93b3JsZCIiCgxIZWxsb1JlcXVlc3QSEgoEbmFtZRgBIAEoCVIEbmFtZSImCgpIZWxsb1JlcGx5EhgKB21lc3NhZ2UYASABKAlSB21lc3NhZ2UySQoHR3JlZXRlchI+CghTYXlIZWxsbxIYLmhlbGxvd29ybGQuSGVsbG9SZXF1ZXN0GhYuaGVsbG93b3JsZC5IZWxsb1JlcGx5IgBCZwobaW8uZ3JwYy5leGFtcGxlcy5oZWxsb3dvcmxkQg9IZWxsb1dvcmxkUHJvdG9QAVo1Z29vZ2xlLmdvbGFuZy5vcmcvZ3JwYy9leGFtcGxlcy9oZWxsb3dvcmxkL2hlbGxvd29ybGRiBnByb3RvMw=="
    out, err := base64.StdEncoding.DecodeString(in)
    if err != nil {
        log.Fatal(err)
    }

    var m descriptor.FileDescriptorProto
    if err := proto.Unmarshal(out, &m); err != nil {
        log.Fatal(err)
    }

    fmt.Println(*m.Name) // examples/helloworld/helloworld/helloworld.proto
    fmt.Println(*m.Service[0].Name) // Greeter
    fmt.Println(*m.Service[0].Method[0].Name) // SayHello
}

メソッドのリクエストとレスポンス型の名前が MethodDescriptorProto に、message の descriptor である MessageDescriptor の一覧が FileDescriptorProto に含まれているため、これにより MessageDescriptorProto も取得することができます。

gRPC リフレクションを使って必要な情報をすべて集めることができました。

Protocol Buffers の message の構築

メソッド呼び出しに必要な情報はすべて集まったため、リクエストを送ることはできるようになりましたが、実際のリクエストの値はどのように作れば良いのでしょうか?
MessageDescriptor は手に入っていますが、実際の message 型 (proto.Message を実装する型) は手に入っていません。通常であれば protoc-gen-go により生成された型を使うことができますが、gRPC リフレクションを使う場合は自動生成された型が手に入りません。

これに対応するには、proto.Message を実装する動的な汎用型を定義する必要があります。Go でいえば jhump/protoreflect の dynamic.Message、protocolbuffers/protobuf-go の dynamic.Message がそれに相当します。

これらは内部的に MessageDescriptor を持ち、MessageDescriptor と入力値を元にエンコード・デコードを行っています。

descriptor はどこから手に入れるのか?

先の例では FileDescriptor や ServiceDescriptor などの descriptor が出てきましたが、そもそも gRPC サーバはそれらをどのようにして手に入れているのでしょうか?
これは完全に実装の話なので Go 以外では全く異なった実装になっていると思います。

FileDescriptor

Go では、Protocol Buffers で定義された型から protoc-gen-go を使い、対応する Go の型を自動生成することができます。
FileDescriptor はこの自動生成されたファイルの中に含まれています。

GreeterServer が定義されている helloworld.proto の場合、そこから自動生成された helloworld.pb.go の末尾に FileDescriptor を値として持つ変数があります。

grpc-go/helloworld.pb.go at 9a465503579e4f97b81d4e2ddafdd1daef80aa93 · grpc/grpc-go · GitHub

そしてこの自動生成されたファイル内の init ではこの FileDescriptor を使って以下のような処理を行っています。

func init() {
    proto.RegisterFile("examples/helloworld/helloworld/helloworld.proto", fileDescriptor_b83ea99a5323a2c7)
}

これは、proto パッケージに FileDescriptor を登録している処理です。
ここで登録された FileDescriptor は proto パッケージの FileDescriptor 関数により取得することができます。

ServiceDescriptor

gRPC + Protocol Buffers において、 ServiceDescriptor は二種類あります。一つは gRPC における ServiceDescriptor である grpc.ServiceDesc です。こちらは gRPC サービスの名前、サービスの具象型、ハンドラやストリームの情報を含む型です。
もう一つは Protocol Buffers で定義された Protocol Buffers の ServiceDescriptor である ServiceDescriptorProto です。こちらは gRPC には全く依存していません。

gRPC の ServiceDescriptor は自動生成されたファイルに含まれており、gRPC サービスの登録処理を行う際に gRPC サーバ内部に保持されます。GreeterServer であれば、RegisterGreeterServer の部分です。

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}

この _Greeter_serviceDesc が protoc-gen-go-grpc により自動生成される ServiceDescriptor で、これは以下のようになっています。

var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter",
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "examples/helloworld/helloworld/helloworld.proto",
}

この ServiceDescriptor の Metadata フィールドに入っているのは元になった Protocol Buffers ファイルの URL です。gRPC リフレクションはこの Metadata から proto.FileDescriptor 関数を呼び出すことで FileDescriptor および FileDescriptor のフィールドから ServiceDescriptor を取得しています。

まとめ

この記事では gRPC リフレクションが有効化された gRPC サーバの例からはじめ、gRPC リフレクションの API 定義およびそれらの API がなにを行っているかを簡単に説明しました。
また、FileDescriptor や ServiceDescriptor について、どのように生成・登録され、gRPC リフレクションはそれらをどのように取得しているかを説明しました。

おそらく他の gRPC 実装ではまた違った方法を使っていると思うので興味がある方は読んでみてください。

Go の t.Cleanup がとてもべんり

Go 1.14 で testing パッケージに新しく t.Cleanup(func())b.Cleanup(func()) が導入されました。
最初は今まで defer を使っていたところを置き換えられるくらいしか良いところがないかな〜と思っていましたが、想像以上に柔軟な使い方ができるので今まで使用したパターンを書いておきます。

Cleanup の特徴

テストランナーは panic ハンドラがあるので、Cleanup は panic が起きたとしても常に呼び出されます。例えば、以下のコードではちゃんと called が出力されます。

func Test_main(t *testing.T) {
    t.Cleanup(func() {
        fmt.Println("called")
    })
    panic("")
}

The Go Playground

別 goroutine で panic した場合や t.Fatal 系が呼ばれた場合でも常に呼び出されます。

追記:
柴田さんからの指摘により、goroutine のスケジューリングを待っていなかったので呼び出されていたことがわかりました。

以下のコードを実行すると t.Cleanup は呼ばれていないことがわかります。

The Go Playground

ドキュメントにもある通り、Cleanup に登録した関数は defer と同様に FILO で呼び出されていきます。以下のコードだと 3、2、1 の順番となります。

func Test_main(t *testing.T) {
    t.Cleanup(func() {
        fmt.Println("1")
    })
    t.Cleanup(func() {
        fmt.Println("2")
    })
    t.Cleanup(func() {
        fmt.Println("3")
    })
}

The Go Playground

Cleanupdefer では defer のほうが先に呼び出されます。

func Test_main(t *testing.T) {
    t.Cleanup(func() {
        fmt.Println("cleanup")
    })
    defer func() {
        fmt.Println("defer")
    }()
}

The Go Playground

また、伝搬はしないので t.Run によってサブテストを作ったとしても、サブテストの終了時に Cleanup が呼ばれることはありません。

役に立ちそうなパターン

環境変数の切り替え

テスト時に動的に環境変数を与えたい場合、今までは以下のようなコードを書いていました。

func setEnv(t *testing.T, k, v string) func() {
    old := os.Getenv(k)
    if err := os.Setenv(k, v); err != nil {
        t.Fatal(err)
    }
    return func() {
        if err := os.Setenv(k, old); err != nil {
            t.Fatal(err)
        }
    }
}

func Test_main(t *testing.T) {
    unsetEnv := setEnv(t, "foo", "bar")
    defer unsetEnv()
}

Cleanup を使うと、unsetEnv の呼び出しを caller にさせずに元に戻すことができます。

func setEnv(t *testing.T, k, v string) func() {
    old := os.Getenv(k)
    if err := os.Setenv(k, v); err != nil {
        t.Fatal(err)
    }
    t.Cleanup(func() {
        if err := os.Setenv(k, old); err != nil {
            t.Fatal(err)
        }
    })
}

func Test_main(t *testing.T) {
    setEnv(t, "foo", "bar")
}

クライアントとサーバの用意

Web API サーバのような、クライアントとサーバが存在する場合は、それらの用意を Cleanup を使って簡潔に書くことができます。 今までだと、例えば HTTP サーバのテストをしたい場合は以下のようなコードを書くことになると思います。

func Test_main(t *testing.T) {
    srv := newServer(t)
    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            t.Error(err)
        }
    }()

    cli := newClient(t)

    // Test with client.

    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    if err := srv.Shutdown(ctx); err != nil {
        t.Fatal(err)
    }
}

func newServer(t *testing.T) *http.Server {
    mux := http.NewServeMux()

    // Register handlers.

    return &http.Server{Addr: ":8080", Handler: mux}
}

func newClient(t *testing.T) *http.Client {
    return http.DefaultClient
}

このコードだと、テスト関数である Test_main 内にテストの setup/teardown ロジックが多く入り込んでしまっていて可読性も良くないし、別なテスト関数で同じようなことをやりたいときにこれらのロジックを再び書くことになってしまいます。

t.Cleanup を使うと以下のように caller 側の setup/teardown ロジックを callee に隠蔽することができます。

func Test_main(t *testing.T) {
    cli := newClientAndRunServer(t)

    // Test with client.
}

func newServer(t *testing.T) *http.Server {
    mux := http.NewServeMux()

    // Register handlers.

    return &http.Server{Addr: ":8080", Handler: mux}
}

func newClientAndRunServer(t *testing.T) *http.Client {
    srv := newServer(t)
    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            t.Error(err)
        }
    }()
    t.Cleanup(func() {
        ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
        defer cancel()
        if err := srv.Shutdown(ctx); err != nil {
            t.Fatal(err)
        }
    })

    return http.DefaultClient
}

サーバの起動から停止まで全てを newClientAndRunServer に入れることができるので caller 側はサーバについて何も考えなくても良くなりました。

setup で複数の goroutine を起動したい場合

上記のクライアント・サーバの応用系として、サーバだけでなく非同期で動くワーカー goroutine などが必要である場合も sync.WaitGrouperrgroup.Group を使って簡潔に書くことができます。

func Test_main(t *testing.T) {
    cli := newClientAndRunServer(t)

    // Test with client.
}

func newServer(t *testing.T) *http.Server {
    mux := http.NewServeMux()

    // Register handlers.

    return &http.Server{Addr: ":8080", Handler: mux}
}

func newClientAndRunServer(t *testing.T) *http.Client {
    ctx, cancel := context.WithCancel(context.Background())
    eg, cctx := errgroup.WithContext(ctx)
    t.Cleanup(func() {
        cancel()
        if err := eg.Wait(); err != nil {
            t.Fatal(err)
        }
    })

    srv := newServer(t)
    eg.Go(func() error {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            return err
        }
        return nil
    })
    t.Cleanup(func() {
        ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
        defer cancel()
        if err := srv.Shutdown(ctx); err != nil {
            t.Fatal(err)
        }
    })
    eg.Go(func() error {
        return runWorker1(cctx)
    })
    eg.Go(func() error {
        return runWorker2(cctx)
    })

    return http.DefaultClient
}

べんりですね。