# 3.2 gRPC

# 3.2.1 gRPC介绍

Jupiter微服务目前支持gRPCJupitergRPC服务提供了很多可观察性的手段。

内置了多个中间件,可以采集请求日志、采集trace、采集监控、采集慢日志,更加方便我们对gRPC服务的可观测。

通过govern的治理端口,能够查看监控、HTTP实时信息

# 3.2.2 配置规范

配置说明

# 3.2.3 直连的gRPC

参考gRPC直连示例 (opens new window)

# 3.2.3.1 启动gRPC服务

配置项

[jupiter.server.grpc]
    port = 9091
1
2

代码

func main() {
    eng := NewEngine()
    eng.SetGovernor("127.0.0.1:9092")
    if err := eng.Run(); err != nil {
        xlog.Panic(err.Error())
    }
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}

    if err := eng.Startup(
        eng.serveGRPC,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) serveGRPC() error {
    server := xgrpc.StdConfig("grpc").Build()
    helloworld.RegisterGreeterServer(server.Server, new(Greeter))
    return eng.Serve(server)
}

type Greeter struct{}

func (g Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    return &helloworld.HelloReply{
        Message: "Hello Jupiter",
    }, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 从图中可以看到,我们启动了一个gRPC服务运行在9091端口,接下来我们启动客户端

# 3.2.3.2 启动gRPC客户端

配置项

[jupiter.grpc.directserver]
    addr = "127.0.0.1:9091"
    balancerName = "round_robin" # 默认值
    dialTimeout = "3s" # 默认值

1
2
3
4
5

代码


func main() {
    eng := NewEngine()
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.consumer,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) consumer() error {
    conn := grpc.StdConfig("directserver").Build()
    client := helloworld.NewGreeterClient(conn)
    for {
        resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
            Name: "jupiter",
        })
        if err != nil {
            xlog.Error(err.Error())
        } else {
            xlog.Info("receive response", xlog.String("resp", resp.Message))
        }
        time.Sleep(1 * time.Second)
    }
    return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

我们的gRPC客户端通过配置里的地址和负载均衡算法,可以请求刚才我们启动的gRPC服务端。运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 我们定时1s,发送hellogRPC服务端,可以收到服务端响应的Hello Jupiter

# 3.2.4 注册ETCD的gRPC服务

参考gRPC注册ETCD示例 (opens new window)

# 3.2.4.2 启动gRPC服务

配置项

[jupiter.server.grpc]
    port = 9091  # 服务端grpc绑定端口
[jupiter.registry.wh]  # 注册grpc到etcd的配置
    connectTimeout = "1s"
    endpoints=["127.0.0.1:2379"]  # grpc注册到目标etcd中
    secure = false
    prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
1
2
3
4
5
6
7

代码

package main

import (
    "context"
    "fmt"
    "github.com/douyu/jupiter"
    compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
    etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/server/xgrpc"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)

func main() {
    eng := NewEngine()
    eng.SetRegistry(
        compound_registry.New(
            etcdv3_registry.StdConfig("wh").Build(),
        ),
    )
    //eng.SetGovernor("0.0.0.0:0")
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.serveGRPC,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) serveGRPC() error {

    server := xgrpc.StdConfig("grpc").Build()
    helloworld.RegisterGreeterServer(server.Server, new(Greeter))
    return eng.Serve(server)
}

type Greeter struct {
    server *xgrpc.Server
}


func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    sd := &helloworld.HelloReply{
        Message: "返回信息给client",
    }

    fmt.Println(fmt.Sprintf("name:%s",request.Name))
    return sd,nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 从图中可以看到,我们启动了一个gRPC服务运行在9091端口,在命令行的第四行,展示了我们注册的keyvalue信息。接下来我们在启动客户端。

# 3.2.4.2 启动gRPC客户端

配置项

[jupiter.registry.wh]
    connectTimeout = "1s"
    endpoints=["127.0.0.1:2379"]
    secure = false
    prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致

[jupiter.grpc.etcdserver]
    addr = "etcd:///main" #etcd:/// 默认前缀, main 指的是应用执行二进制文件名称(发布平台将它默认为应用名称). 框架内部会把 main解析出来跟前缀做拼接,去etcd找到对应grpc服务端注册key
    balancerName = "round_robin" # 默认值,grpc客户端调用服务端采用的 轮训模式
    dialTimeout = "3s" # 默认值
    
1
2
3
4
5
6
7
8
9
10
11

grpc客户端代码demo

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/douyu/jupiter"
    "github.com/douyu/jupiter/pkg/client/grpc"
    "github.com/douyu/jupiter/pkg/client/grpc/balancer"
    "github.com/douyu/jupiter/pkg/client/grpc/resolver"
    "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)

func main() {
    eng := NewEngine()
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
    fmt.Printf("111 = %+v\n", 111)
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.initResolver,
        eng.consumer,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) initResolver() error {
    resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
    return nil
}

func (eng *Engine) consumer() error {
    config := grpc.StdConfig("etcdserver")
    //config.BalancerName = balancer.NameSmoothWeightRoundRobin
    client := helloworld.NewGreeterClient(config.Build())

    go func() {
        i:=0
        ghj := map[string]int{}
        for {

            i++
            resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
                Name: fmt.Sprintf("jupiter:%d",i),
            })
            if err != nil {
                fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
                xlog.Error(err.Error())
            } else {
                ghj[resp.Message] = ghj[resp.Message] + 1
                fmt.Printf("resp.Message = %+v\n", ghj)
                xlog.Info("receive response", xlog.String("resp", resp.Message))
            }
            time.Sleep(1 * time.Second)
        }
    }()
    return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 我们的gRPC客户端通过应用名称mainETCD中获取到服务地址,并监听了/wsd-reg/main,用于后续更新服务地址。

客户端会定时1s,发送hellogRPC服务端,可以收到服务端响应的``Jupiter 1类似信息到服务端

# 3.2.4.3 从零开始配置

环境要求 jupiter(commit:8b67ebec1ae6dc07e8df27d7240aa9b4d954671b)如果用的jupiter就用1.8

生成potobuf

syntax = "proto3";
package pb;

service Hello {
  // SayHello
  rpc SayHello(SayHelloReq) returns (SayHelloRes);
}

message SayHelloReq{
  string name = 1;
}

message SayHelloRes{
  string resp = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

执行命令: protoc -I . --go_out=plugins=grpc:. ./hello.proto 拿到对应pb包放入对应项目的路径

go build -o jupiter-demo main.go 记得把下面的demo编译成jupiter-demo,再执行 jupiter 客户端 使用demo

# 配置
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg"  # 前缀

[jupiter.grpc.etcdserver]
addr = "etcd:///jupiter-demo" # jupiter-demo 指的是执行文件名称,在发布平台执行文件名称跟应用名称是一样的
dialTimeout = "3s" # 默认值

package main

import (
    "clientb/pb"
    "context"
    "fmt"
    "time"

    "github.com/douyu/jupiter"
    "github.com/douyu/jupiter/pkg/client/grpc"
    "github.com/douyu/jupiter/pkg/client/grpc/balancer"
    "github.com/douyu/jupiter/pkg/client/grpc/resolver"
    "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/xlog"
)

func main() {
    eng := NewEngine()
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
    fmt.Printf("111 = %+v\n", 111)
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.initResolver,
        eng.consumer,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) initResolver() error {
    resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
    return nil
}

func (eng *Engine) consumer() error {
    config := grpc.StdConfig("etcdserver")
    config.BalancerName = balancer.NameSmoothWeightRoundRobin
    client := pb.NewHelloClient(config.Build())
    go func() {
        i:=0
        ghj := map[string]int{}
        for {

            i++
            resp, err := client.SayHello(context.Background(), &pb.SayHelloReq{
                Name: fmt.Sprintf("jupiter:%d",i),
            })
            if err != nil {
                fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
                xlog.Error(err.Error())
            } else {
                ghj[resp.Resp] = ghj[resp.Resp] + 1
                fmt.Printf("resp.Message = %+v\n", ghj)
                xlog.Info("receive response", xlog.String("resp", resp.Resp))
            }
            time.Sleep(1 * time.Second)
        }
    }()
    return nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

jupiter grpc 客户端 demo

# 配置
[app]
[app.registry.etcd]# 使用etcd作为服务发现
endpoints=["127.0.0.1:2379"] #etcd的地址,grpc的服务端须已经注册到这个etcd
timeout="2s"
[jupiter]
[jupiter.grpc]
[jupiter.grpc.wsg-reg] # wsg-reg 这里得注意,这是服务端的前缀
debug = true # Debug开关
enableMetric = true # 指标采集开关
enableAccessLog = true # 访问日志开关
addr = "jupiter-demo" #目标地址。direct=true,该值设为服务ip:port, direct=false,则为服务注册名,其实就是执行文件名称,也就是应用名称
dialTimeout = "1s" # 拨超时
readTimeout = "1s" # 读超时
enableTrace = false # 链路追踪开关
balancerName = "round_robin" # 默认为round_robin
level = "panic" # 创建时的告警等级,level=panic创建Client失败时panic
wait = true # 默认:true 是否一直等待直到连接建立,wait=true时,dialTimeout失效。注意Wait可能会导致创建过程阻塞
direct = false # 直连服务,不经过负载均衡器
slowThreshold = "1s" # slow日志门限值


package main

import (
    "context"
    "demomimi/pb"
    "fmt"
    "time"
)
import "github.com/douyu/jupiter/client/gusty"
// 新建demo客户端
var (
    DemoClient pb.HelloClient
)

// 读取demo的grpc配置,并初始化
func init() {

    DemoClient = pb.NewHelloClient(gusty.Invoker("wsg-reg"))
}

func main()  {
    sdfg := map[string]int{}
    for{
        resp,err:=SayHello()
        if err !=nil{
            fmt.Println(err,"uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu")
            continue
        }
        sdfg[resp.Resp] = sdfg[resp.Resp] + 1
        fmt.Println("uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu",sdfg)
        time.Sleep(time.Second * 1)
    }

}


// SayHello 调用该grpc的方法
func SayHello() (*pb.SayHelloRes, error) {
    ctx := context.Background()
    ctx, grpcTimeOut := context.WithTimeout(ctx, 1*time.Second)
    defer grpcTimeOut()
    helloRes, err := DemoClient.SayHello(
        ctx,
        &pb.SayHelloReq{
            Name: fmt.Sprintf("word"),
        },
    )
    if err != nil {
        return &pb.SayHelloRes{}, err
    }
    return helloRes, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

jupiter 服务端调用


[jupiter.server.grpc]
    port = 20102
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 这个前缀记得跟client保持一致



package main

import (
    "context"
    "fmt"
    "github.com/douyu/jupiter"
    compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
    etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/server/xgrpc"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)

func main() {
    eng := NewEngine()
    eng.SetRegistry(
        compound_registry.New(
            etcdv3_registry.StdConfig("wh").Build(),
        ),
    )
    //eng.SetGovernor("0.0.0.0:0")
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.serveGRPC,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) serveGRPC() error {
    fmt.Println("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY")
    server := xgrpc.StdConfig("grpc").Build()
    helloworld.RegisterGreeterServer(server.Server, new(Greeter))
    return eng.Serve(server)
}


type Greeter struct {
    server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    sd := &helloworld.HelloReply{
        Message: "我是client_b",
    }
    fmt.Println(fmt.Sprintf("name:%s",request.Name))
    return sd,nil
}



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72