# 3.2 gRPC
# 3.2.1 gRPC介绍
Jupiter
微服务目前支持gRPC
,Jupiter
对gRPC
服务提供了很多可观察性的手段。
内置了多个中间件,可以采集请求日志、采集trace、采集监控、采集慢日志,更加方便我们对gRPC
服务的可观测。
通过govern
的治理端口,能够查看监控、HTTP实时信息
# 3.2.2 配置规范
# 3.2.3 直连的gRPC
# 3.2.3.1 启动gRPC服务
配置项
[jupiter.server.grpc]
port = 9091
2
代码
func main() {
eng := NewEngine()
eng.SetGovernor("127.0.0.1:9092")
if err := eng.Run(); err != nil {
xlog.Panic(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.serveGRPC,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) serveGRPC() error {
server := xgrpc.StdConfig("grpc").Build()
helloworld.RegisterGreeterServer(server.Server, new(Greeter))
return eng.Serve(server)
}
type Greeter struct{}
func (g Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
return &helloworld.HelloReply{
Message: "Hello Jupiter",
}, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
运行指令go run main.go --config=config.toml
,可以看到以下运行结果
从图中可以看到,我们启动了一个gRPC
服务运行在9091
端口,接下来我们启动客户端
# 3.2.3.2 启动gRPC客户端
配置项
[jupiter.grpc.directserver]
addr = "127.0.0.1:9091"
balancerName = "round_robin" # 默认值
dialTimeout = "3s" # 默认值
2
3
4
5
代码
func main() {
eng := NewEngine()
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.consumer,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) consumer() error {
conn := grpc.StdConfig("directserver").Build()
client := helloworld.NewGreeterClient(conn)
for {
resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
Name: "jupiter",
})
if err != nil {
xlog.Error(err.Error())
} else {
xlog.Info("receive response", xlog.String("resp", resp.Message))
}
time.Sleep(1 * time.Second)
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
我们的gRPC
客户端通过配置里的地址和负载均衡算法,可以请求刚才我们启动的gRPC
服务端。运行指令go run main.go --config=config.toml
,可以看到以下运行结果
我们定时1s,发送hello
给gRPC
服务端,可以收到服务端响应的Hello Jupiter
# 3.2.4 注册ETCD的gRPC服务
参考gRPC注册ETCD示例 (opens new window)
# 3.2.4.2 启动gRPC服务
配置项
[jupiter.server.grpc]
port = 9091 # 服务端grpc绑定端口
[jupiter.registry.wh] # 注册grpc到etcd的配置
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"] # grpc注册到目标etcd中
secure = false
prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
2
3
4
5
6
7
代码
package main
import (
"context"
"fmt"
"github.com/douyu/jupiter"
compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/server/xgrpc"
"github.com/douyu/jupiter/pkg/xlog"
"google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
eng := NewEngine()
eng.SetRegistry(
compound_registry.New(
etcdv3_registry.StdConfig("wh").Build(),
),
)
//eng.SetGovernor("0.0.0.0:0")
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.serveGRPC,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) serveGRPC() error {
server := xgrpc.StdConfig("grpc").Build()
helloworld.RegisterGreeterServer(server.Server, new(Greeter))
return eng.Serve(server)
}
type Greeter struct {
server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
sd := &helloworld.HelloReply{
Message: "返回信息给client",
}
fmt.Println(fmt.Sprintf("name:%s",request.Name))
return sd,nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
运行指令go run main.go --config=config.toml
,可以看到以下运行结果
从图中可以看到,我们启动了一个gRPC
服务运行在9091
端口,在命令行的第四行,展示了我们注册的key
和value
信息。接下来我们在启动客户端。
# 3.2.4.2 启动gRPC客户端
配置项
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
[jupiter.grpc.etcdserver]
addr = "etcd:///main" #etcd:/// 默认前缀, main 指的是应用执行二进制文件名称(发布平台将它默认为应用名称). 框架内部会把 main解析出来跟前缀做拼接,去etcd找到对应grpc服务端注册key
balancerName = "round_robin" # 默认值,grpc客户端调用服务端采用的 轮训模式
dialTimeout = "3s" # 默认值
2
3
4
5
6
7
8
9
10
11
grpc客户端代码demo
package main
import (
"context"
"fmt"
"time"
"github.com/douyu/jupiter"
"github.com/douyu/jupiter/pkg/client/grpc"
"github.com/douyu/jupiter/pkg/client/grpc/balancer"
"github.com/douyu/jupiter/pkg/client/grpc/resolver"
"github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/xlog"
"google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
eng := NewEngine()
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
fmt.Printf("111 = %+v\n", 111)
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.initResolver,
eng.consumer,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) initResolver() error {
resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
return nil
}
func (eng *Engine) consumer() error {
config := grpc.StdConfig("etcdserver")
//config.BalancerName = balancer.NameSmoothWeightRoundRobin
client := helloworld.NewGreeterClient(config.Build())
go func() {
i:=0
ghj := map[string]int{}
for {
i++
resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
Name: fmt.Sprintf("jupiter:%d",i),
})
if err != nil {
fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
xlog.Error(err.Error())
} else {
ghj[resp.Message] = ghj[resp.Message] + 1
fmt.Printf("resp.Message = %+v\n", ghj)
xlog.Info("receive response", xlog.String("resp", resp.Message))
}
time.Sleep(1 * time.Second)
}
}()
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
运行指令go run main.go --config=config.toml
,可以看到以下运行结果
我们的gRPC
客户端通过应用名称main
从ETCD
中获取到服务地址,并监听了/wsd-reg/main
,用于后续更新服务地址。
客户端会定时1s,发送hello
给gRPC
服务端,可以收到服务端响应的``Jupiter 1类似信息到服务端
# 3.2.4.3 从零开始配置
环境要求 jupiter(commit:8b67ebec1ae6dc07e8df27d7240aa9b4d954671b)如果用的jupiter就用1.8
生成potobuf
syntax = "proto3";
package pb;
service Hello {
// SayHello
rpc SayHello(SayHelloReq) returns (SayHelloRes);
}
message SayHelloReq{
string name = 1;
}
message SayHelloRes{
string resp = 1;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
执行命令: protoc -I . --go_out=plugins=grpc:. ./hello.proto
拿到对应pb包放入对应项目的路径
go build -o jupiter-demo main.go
记得把下面的demo编译成jupiter-demo,再执行
jupiter 客户端 使用demo
# 配置
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 前缀
[jupiter.grpc.etcdserver]
addr = "etcd:///jupiter-demo" # jupiter-demo 指的是执行文件名称,在发布平台执行文件名称跟应用名称是一样的
dialTimeout = "3s" # 默认值
package main
import (
"clientb/pb"
"context"
"fmt"
"time"
"github.com/douyu/jupiter"
"github.com/douyu/jupiter/pkg/client/grpc"
"github.com/douyu/jupiter/pkg/client/grpc/balancer"
"github.com/douyu/jupiter/pkg/client/grpc/resolver"
"github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/xlog"
)
func main() {
eng := NewEngine()
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
fmt.Printf("111 = %+v\n", 111)
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.initResolver,
eng.consumer,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) initResolver() error {
resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
return nil
}
func (eng *Engine) consumer() error {
config := grpc.StdConfig("etcdserver")
config.BalancerName = balancer.NameSmoothWeightRoundRobin
client := pb.NewHelloClient(config.Build())
go func() {
i:=0
ghj := map[string]int{}
for {
i++
resp, err := client.SayHello(context.Background(), &pb.SayHelloReq{
Name: fmt.Sprintf("jupiter:%d",i),
})
if err != nil {
fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
xlog.Error(err.Error())
} else {
ghj[resp.Resp] = ghj[resp.Resp] + 1
fmt.Printf("resp.Message = %+v\n", ghj)
xlog.Info("receive response", xlog.String("resp", resp.Resp))
}
time.Sleep(1 * time.Second)
}
}()
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
jupiter grpc 客户端 demo
# 配置
[app]
[app.registry.etcd]# 使用etcd作为服务发现
endpoints=["127.0.0.1:2379"] #etcd的地址,grpc的服务端须已经注册到这个etcd
timeout="2s"
[jupiter]
[jupiter.grpc]
[jupiter.grpc.wsg-reg] # wsg-reg 这里得注意,这是服务端的前缀
debug = true # Debug开关
enableMetric = true # 指标采集开关
enableAccessLog = true # 访问日志开关
addr = "jupiter-demo" #目标地址。direct=true,该值设为服务ip:port, direct=false,则为服务注册名,其实就是执行文件名称,也就是应用名称
dialTimeout = "1s" # 拨超时
readTimeout = "1s" # 读超时
enableTrace = false # 链路追踪开关
balancerName = "round_robin" # 默认为round_robin
level = "panic" # 创建时的告警等级,level=panic创建Client失败时panic
wait = true # 默认:true 是否一直等待直到连接建立,wait=true时,dialTimeout失效。注意Wait可能会导致创建过程阻塞
direct = false # 直连服务,不经过负载均衡器
slowThreshold = "1s" # slow日志门限值
package main
import (
"context"
"demomimi/pb"
"fmt"
"time"
)
import "github.com/douyu/jupiter/client/gusty"
// 新建demo客户端
var (
DemoClient pb.HelloClient
)
// 读取demo的grpc配置,并初始化
func init() {
DemoClient = pb.NewHelloClient(gusty.Invoker("wsg-reg"))
}
func main() {
sdfg := map[string]int{}
for{
resp,err:=SayHello()
if err !=nil{
fmt.Println(err,"uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu")
continue
}
sdfg[resp.Resp] = sdfg[resp.Resp] + 1
fmt.Println("uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu",sdfg)
time.Sleep(time.Second * 1)
}
}
// SayHello 调用该grpc的方法
func SayHello() (*pb.SayHelloRes, error) {
ctx := context.Background()
ctx, grpcTimeOut := context.WithTimeout(ctx, 1*time.Second)
defer grpcTimeOut()
helloRes, err := DemoClient.SayHello(
ctx,
&pb.SayHelloReq{
Name: fmt.Sprintf("word"),
},
)
if err != nil {
return &pb.SayHelloRes{}, err
}
return helloRes, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
jupiter 服务端调用
[jupiter.server.grpc]
port = 20102
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 这个前缀记得跟client保持一致
package main
import (
"context"
"fmt"
"github.com/douyu/jupiter"
compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
"github.com/douyu/jupiter/pkg/server/xgrpc"
"github.com/douyu/jupiter/pkg/xlog"
"google.golang.org/grpc/examples/helloworld/helloworld"
)
func main() {
eng := NewEngine()
eng.SetRegistry(
compound_registry.New(
etcdv3_registry.StdConfig("wh").Build(),
),
)
//eng.SetGovernor("0.0.0.0:0")
if err := eng.Run(); err != nil {
xlog.Error(err.Error())
}
}
type Engine struct {
jupiter.Application
}
func NewEngine() *Engine {
eng := &Engine{}
if err := eng.Startup(
eng.serveGRPC,
); err != nil {
xlog.Panic("startup", xlog.Any("err", err))
}
return eng
}
func (eng *Engine) serveGRPC() error {
fmt.Println("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY")
server := xgrpc.StdConfig("grpc").Build()
helloworld.RegisterGreeterServer(server.Server, new(Greeter))
return eng.Serve(server)
}
type Greeter struct {
server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
sd := &helloworld.HelloReply{
Message: "我是client_b",
}
fmt.Println(fmt.Sprintf("name:%s",request.Name))
return sd,nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
← 3.1 HTTP 3.3 Worker →