# 3.2 gRPC
# 3.2.1 gRPC介绍
Jupiter微服务目前支持gRPC,Jupiter对gRPC服务提供了很多可观察性的手段。
内置了多个中间件,可以采集请求日志、采集trace、采集监控、采集慢日志,更加方便我们对gRPC服务的可观测。
通过govern的治理端口,能够查看监控、HTTP实时信息
# 3.2.2 配置规范
# 3.2.3 直连的gRPC
# 3.2.3.1 启动gRPC服务
配置项
[jupiter.server.grpc]
    port = 9091
2
代码
func main() {
    eng := NewEngine()
    eng.SetGovernor("127.0.0.1:9092")
    if err := eng.Run(); err != nil {
        xlog.Panic(err.Error())
    }
}
type Engine struct {
    jupiter.Application
}
func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.serveGRPC,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}
func (eng *Engine) serveGRPC() error {
    server := xgrpc.StdConfig("grpc").Build()
    helloworld.RegisterGreeterServer(server.Server, new(Greeter))
    return eng.Serve(server)
}
type Greeter struct{}
func (g Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    return &helloworld.HelloReply{
        Message: "Hello Jupiter",
    }, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
运行指令go run main.go --config=config.toml,可以看到以下运行结果
 从图中可以看到,我们启动了一个
从图中可以看到,我们启动了一个gRPC服务运行在9091端口,接下来我们启动客户端
# 3.2.3.2 启动gRPC客户端
配置项
[jupiter.grpc.directserver]
    addr = "127.0.0.1:9091"
    balancerName = "round_robin" # 默认值
    dialTimeout = "3s" # 默认值
2
3
4
5
代码
func main() {
    eng := NewEngine()
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
}
type Engine struct {
    jupiter.Application
}
func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.consumer,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}
func (eng *Engine) consumer() error {
    conn := grpc.StdConfig("directserver").Build()
    client := helloworld.NewGreeterClient(conn)
    for {
        resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
            Name: "jupiter",
        })
        if err != nil {
            xlog.Error(err.Error())
        } else {
            xlog.Info("receive response", xlog.String("resp", resp.Message))
        }
        time.Sleep(1 * time.Second)
    }
    return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
我们的gRPC客户端通过配置里的地址和负载均衡算法,可以请求刚才我们启动的gRPC服务端。运行指令go run main.go --config=config.toml,可以看到以下运行结果
 我们定时1s,发送
我们定时1s,发送hello给gRPC服务端,可以收到服务端响应的Hello Jupiter
# 3.2.4 注册ETCD的gRPC服务
参考gRPC注册ETCD示例 (opens new window)
# 3.2.4.2 启动gRPC服务
配置项
[jupiter.server.grpc]
    port = 9091  # 服务端grpc绑定端口
[jupiter.registry.wh]  # 注册grpc到etcd的配置
    connectTimeout = "1s"
    endpoints=["127.0.0.1:2379"]  # grpc注册到目标etcd中
    secure = false
    prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
2
3
4
5
6
7
代码
package main
import (
    "context"
    "fmt"
    "github.com/douyu/jupiter"
    compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
    etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/server/xgrpc"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
    eng := NewEngine()
    eng.SetRegistry(
        compound_registry.New(
            etcdv3_registry.StdConfig("wh").Build(),
        ),
    )
    //eng.SetGovernor("0.0.0.0:0")
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
}
type Engine struct {
    jupiter.Application
}
func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.serveGRPC,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}
func (eng *Engine) serveGRPC() error {
    server := xgrpc.StdConfig("grpc").Build()
    helloworld.RegisterGreeterServer(server.Server, new(Greeter))
    return eng.Serve(server)
}
type Greeter struct {
    server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    sd := &helloworld.HelloReply{
        Message: "返回信息给client",
    }
    fmt.Println(fmt.Sprintf("name:%s",request.Name))
    return sd,nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
运行指令go run main.go --config=config.toml,可以看到以下运行结果
 从图中可以看到,我们启动了一个
从图中可以看到,我们启动了一个gRPC服务运行在9091端口,在命令行的第四行,展示了我们注册的key和value信息。接下来我们在启动客户端。
# 3.2.4.2 启动gRPC客户端
配置项
[jupiter.registry.wh]
    connectTimeout = "1s"
    endpoints=["127.0.0.1:2379"]
    secure = false
    prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
[jupiter.grpc.etcdserver]
    addr = "etcd:///main" #etcd:/// 默认前缀, main 指的是应用执行二进制文件名称(发布平台将它默认为应用名称). 框架内部会把 main解析出来跟前缀做拼接,去etcd找到对应grpc服务端注册key
    balancerName = "round_robin" # 默认值,grpc客户端调用服务端采用的 轮训模式
    dialTimeout = "3s" # 默认值
    
2
3
4
5
6
7
8
9
10
11
grpc客户端代码demo
package main
import (
    "context"
    "fmt"
    "time"
    "github.com/douyu/jupiter"
    "github.com/douyu/jupiter/pkg/client/grpc"
    "github.com/douyu/jupiter/pkg/client/grpc/balancer"
    "github.com/douyu/jupiter/pkg/client/grpc/resolver"
    "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
    eng := NewEngine()
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
    fmt.Printf("111 = %+v\n", 111)
}
type Engine struct {
    jupiter.Application
}
func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.initResolver,
        eng.consumer,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}
func (eng *Engine) initResolver() error {
    resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
    return nil
}
func (eng *Engine) consumer() error {
    config := grpc.StdConfig("etcdserver")
    //config.BalancerName = balancer.NameSmoothWeightRoundRobin
    client := helloworld.NewGreeterClient(config.Build())
    go func() {
        i:=0
        ghj := map[string]int{}
        for {
            i++
            resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
                Name: fmt.Sprintf("jupiter:%d",i),
            })
            if err != nil {
                fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
                xlog.Error(err.Error())
            } else {
                ghj[resp.Message] = ghj[resp.Message] + 1
                fmt.Printf("resp.Message = %+v\n", ghj)
                xlog.Info("receive response", xlog.String("resp", resp.Message))
            }
            time.Sleep(1 * time.Second)
        }
    }()
    return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
运行指令go run main.go --config=config.toml,可以看到以下运行结果
 我们的
我们的gRPC客户端通过应用名称main从ETCD中获取到服务地址,并监听了/wsd-reg/main,用于后续更新服务地址。
客户端会定时1s,发送hello给gRPC服务端,可以收到服务端响应的``Jupiter 1类似信息到服务端
# 3.2.4.3 从零开始配置
环境要求 jupiter(commit:8b67ebec1ae6dc07e8df27d7240aa9b4d954671b)如果用的jupiter就用1.8
生成potobuf
syntax = "proto3";
package pb;
service Hello {
  // SayHello
  rpc SayHello(SayHelloReq) returns (SayHelloRes);
}
message SayHelloReq{
  string name = 1;
}
message SayHelloRes{
  string resp = 1;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
执行命令: protoc -I . --go_out=plugins=grpc:. ./hello.proto 拿到对应pb包放入对应项目的路径
go build -o jupiter-demo main.go  记得把下面的demo编译成jupiter-demo,再执行
jupiter 客户端 使用demo
# 配置
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg"  # 前缀
[jupiter.grpc.etcdserver]
addr = "etcd:///jupiter-demo" # jupiter-demo 指的是执行文件名称,在发布平台执行文件名称跟应用名称是一样的
dialTimeout = "3s" # 默认值
package main
import (
    "clientb/pb"
    "context"
    "fmt"
    "time"
    "github.com/douyu/jupiter"
    "github.com/douyu/jupiter/pkg/client/grpc"
    "github.com/douyu/jupiter/pkg/client/grpc/balancer"
    "github.com/douyu/jupiter/pkg/client/grpc/resolver"
    "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/xlog"
)
func main() {
    eng := NewEngine()
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
    fmt.Printf("111 = %+v\n", 111)
}
type Engine struct {
    jupiter.Application
}
func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.initResolver,
        eng.consumer,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}
func (eng *Engine) initResolver() error {
    resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
    return nil
}
func (eng *Engine) consumer() error {
    config := grpc.StdConfig("etcdserver")
    config.BalancerName = balancer.NameSmoothWeightRoundRobin
    client := pb.NewHelloClient(config.Build())
    go func() {
        i:=0
        ghj := map[string]int{}
        for {
            i++
            resp, err := client.SayHello(context.Background(), &pb.SayHelloReq{
                Name: fmt.Sprintf("jupiter:%d",i),
            })
            if err != nil {
                fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
                xlog.Error(err.Error())
            } else {
                ghj[resp.Resp] = ghj[resp.Resp] + 1
                fmt.Printf("resp.Message = %+v\n", ghj)
                xlog.Info("receive response", xlog.String("resp", resp.Resp))
            }
            time.Sleep(1 * time.Second)
        }
    }()
    return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
jupiter grpc 客户端 demo
# 配置
[app]
[app.registry.etcd]# 使用etcd作为服务发现
endpoints=["127.0.0.1:2379"] #etcd的地址,grpc的服务端须已经注册到这个etcd
timeout="2s"
[jupiter]
[jupiter.grpc]
[jupiter.grpc.wsg-reg] # wsg-reg 这里得注意,这是服务端的前缀
debug = true # Debug开关
enableMetric = true # 指标采集开关
enableAccessLog = true # 访问日志开关
addr = "jupiter-demo" #目标地址。direct=true,该值设为服务ip:port, direct=false,则为服务注册名,其实就是执行文件名称,也就是应用名称
dialTimeout = "1s" # 拨超时
readTimeout = "1s" # 读超时
enableTrace = false # 链路追踪开关
balancerName = "round_robin" # 默认为round_robin
level = "panic" # 创建时的告警等级,level=panic创建Client失败时panic
wait = true # 默认:true 是否一直等待直到连接建立,wait=true时,dialTimeout失效。注意Wait可能会导致创建过程阻塞
direct = false # 直连服务,不经过负载均衡器
slowThreshold = "1s" # slow日志门限值
package main
import (
    "context"
    "demomimi/pb"
    "fmt"
    "time"
)
import "github.com/douyu/jupiter/client/gusty"
// 新建demo客户端
var (
    DemoClient pb.HelloClient
)
// 读取demo的grpc配置,并初始化
func init() {
    DemoClient = pb.NewHelloClient(gusty.Invoker("wsg-reg"))
}
func main()  {
    sdfg := map[string]int{}
    for{
        resp,err:=SayHello()
        if err !=nil{
            fmt.Println(err,"uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu")
            continue
        }
        sdfg[resp.Resp] = sdfg[resp.Resp] + 1
        fmt.Println("uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu",sdfg)
        time.Sleep(time.Second * 1)
    }
}
// SayHello 调用该grpc的方法
func SayHello() (*pb.SayHelloRes, error) {
    ctx := context.Background()
    ctx, grpcTimeOut := context.WithTimeout(ctx, 1*time.Second)
    defer grpcTimeOut()
    helloRes, err := DemoClient.SayHello(
        ctx,
        &pb.SayHelloReq{
            Name: fmt.Sprintf("word"),
        },
    )
    if err != nil {
        return &pb.SayHelloRes{}, err
    }
    return helloRes, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
jupiter 服务端调用
[jupiter.server.grpc]
    port = 20102
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 这个前缀记得跟client保持一致
package main
import (
    "context"
    "fmt"
    "github.com/douyu/jupiter"
    compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
    etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/server/xgrpc"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
    eng := NewEngine()
    eng.SetRegistry(
        compound_registry.New(
            etcdv3_registry.StdConfig("wh").Build(),
        ),
    )
    //eng.SetGovernor("0.0.0.0:0")
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
}
type Engine struct {
    jupiter.Application
}
func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.serveGRPC,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}
func (eng *Engine) serveGRPC() error {
    fmt.Println("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY")
    server := xgrpc.StdConfig("grpc").Build()
    helloworld.RegisterGreeterServer(server.Server, new(Greeter))
    return eng.Serve(server)
}
type Greeter struct {
    server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    sd := &helloworld.HelloReply{
        Message: "我是client_b",
    }
    fmt.Println(fmt.Sprintf("name:%s",request.Name))
    return sd,nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
← 3.1 HTTP 3.3 Worker →