# 3.2 gRPC

# 3.2.1 gRPC介绍

Jupiter微服务目前支持gRPCJupitergRPC服务提供了很多可观察性的手段。

内置了多个中间件,可以采集请求日志、采集trace、采集监控、采集慢日志,更加方便我们对gRPC服务的可观测。

通过govern的治理端口,能够查看监控、HTTP实时信息

# 3.2.2 配置规范

配置说明

# 3.2.3 直连的gRPC

参考gRPC直连示例

# 3.2.3.1 启动gRPC服务

配置项

[jupiter.server.grpc]
    port = 9091
1
2

代码

func main() {
	eng := NewEngine()
	eng.SetGovernor("127.0.0.1:9092")
	if err := eng.Run(); err != nil {
		xlog.Panic(err.Error())
	}
}

type Engine struct {
	jupiter.Application
}

func NewEngine() *Engine {
	eng := &Engine{}

	if err := eng.Startup(
		eng.serveGRPC,
	); err != nil {
		xlog.Panic("startup", xlog.Any("err", err))
	}
	return eng
}

func (eng *Engine) serveGRPC() error {
	server := xgrpc.StdConfig("grpc").Build()
	helloworld.RegisterGreeterServer(server.Server, new(Greeter))
	return eng.Serve(server)
}

type Greeter struct{}

func (g Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
	return &helloworld.HelloReply{
		Message: "Hello Jupiter",
	}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 从图中可以看到,我们启动了一个gRPC服务运行在9091端口,接下来我们启动客户端

# 3.2.3.2 启动gRPC客户端

配置项

[jupiter.client.directserver]
    address = "127.0.0.1:9091"
    balancerName = "round_robin" # 默认值
    block =  false # 默认值
    dialTimeout = "0s" # 默认值

1
2
3
4
5
6

代码


func main() {
	eng := NewEngine()
	if err := eng.Run(); err != nil {
		xlog.Error(err.Error())
	}
}

type Engine struct {
	jupiter.Application
}

func NewEngine() *Engine {
	eng := &Engine{}
	if err := eng.Startup(
		eng.consumer,
	); err != nil {
		xlog.Panic("startup", xlog.Any("err", err))
	}
	return eng
}

func (eng *Engine) consumer() error {
	conn := grpc.StdConfig("directserver").Build()
	client := helloworld.NewGreeterClient(conn)
	for {
		resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
			Name: "jupiter",
		})
		if err != nil {
			xlog.Error(err.Error())
		} else {
			xlog.Info("receive response", xlog.String("resp", resp.Message))
		}
		time.Sleep(1 * time.Second)
	}
	return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

我们的gRPC客户端通过配置里的地址和负载均衡算法,可以请求刚才我们启动的gRPC服务端。运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 我们定时1s,发送hellogRPC服务端,可以收到服务端响应的Hello Jupiter

# 3.2.4 注册ETCD的gRPC服务

参考gRPC注册ETCD示例

# 3.2.4.2 启动gRPC服务

配置项

[jupiter.server.grpc]
    port = 9091  # 服务端grpc绑定端口
[jupiter.registry.wh]  # 注册grpc到etcd的配置
    connectTimeout = "1s"
    endpoints=["127.0.0.1:2379"]  # grpc注册到目标etcd中
    secure = false
    prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致
1
2
3
4
5
6
7

代码

package main

import (
    "context"
    "fmt"
    "github.com/douyu/jupiter"
    compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
    etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/server/xgrpc"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)

func main() {
    eng := NewEngine()
    eng.SetRegistry(
        compound_registry.New(
            etcdv3_registry.StdConfig("wh").Build(),
        ),
    )
    //eng.SetGovernor("0.0.0.0:0")
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.serveGRPC,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) serveGRPC() error {

    server := xgrpc.StdConfig("grpc").Build()
    helloworld.RegisterGreeterServer(server.Server, new(Greeter))
    return eng.Serve(server)
}

type Greeter struct {
    server *xgrpc.Server
}


func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    sd := &helloworld.HelloReply{
        Message: "返回信息给client",
    }

    fmt.Println(fmt.Sprintf("name:%s",request.Name))
    return sd,nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 从图中可以看到,我们启动了一个gRPC服务运行在9091端口,在命令行的第四行,展示了我们注册的keyvalue信息。接下来我们在启动客户端。

# 3.2.4.2 启动gRPC客户端

配置项

[jupiter.registry.wh]
    connectTimeout = "1s"
    endpoints=["127.0.0.1:2379"]
    secure = false
    prefix = "wsd-reg" # 服务端注册到etcd的key前缀,配置客户端时候应该保持一致

[jupiter.client.etcdserver]
    address = "etcd:///main" #etcd:/// 默认前缀, main 指的是应用执行二进制文件名称(发布平台将它默认为应用名称). 框架内部会把 main解析出来跟前缀做拼接,去etcd找到对应grpc服务端注册key
    balancerName = "round_robin" # 默认值,grpc客户端调用服务端采用的 轮训模式
    block =  false # 默认值
    dialTimeout = "0s" # 默认值
    
1
2
3
4
5
6
7
8
9
10
11
12

grpc客户端代码demo

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/douyu/jupiter"
    "github.com/douyu/jupiter/pkg/client/grpc"
    "github.com/douyu/jupiter/pkg/client/grpc/balancer"
    "github.com/douyu/jupiter/pkg/client/grpc/resolver"
    "github.com/douyu/jupiter/pkg/registry/etcdv3"
    "github.com/douyu/jupiter/pkg/xlog"
    "google.golang.org/grpc/examples/helloworld/helloworld"
)

func main() {
    eng := NewEngine()
    if err := eng.Run(); err != nil {
        xlog.Error(err.Error())
    }
    fmt.Printf("111 = %+v\n", 111)
}

type Engine struct {
    jupiter.Application
}

func NewEngine() *Engine {
    eng := &Engine{}
    if err := eng.Startup(
        eng.initResolver,
        eng.consumer,
    ); err != nil {
        xlog.Panic("startup", xlog.Any("err", err))
    }
    return eng
}

func (eng *Engine) initResolver() error {
    resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
    return nil
}

func (eng *Engine) consumer() error {
    config := grpc.StdConfig("etcdserver")
    //config.BalancerName = balancer.NameSmoothWeightRoundRobin
    client := helloworld.NewGreeterClient(config.Build())

    go func() {
        i:=0
        ghj := map[string]int{}
        for {

            i++
            resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{
                Name: fmt.Sprintf("jupiter:%d",i),
            })
            if err != nil {
                fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
                xlog.Error(err.Error())
            } else {
                ghj[resp.Message] = ghj[resp.Message] + 1
                fmt.Printf("resp.Message = %+v\n", ghj)
                xlog.Info("receive response", xlog.String("resp", resp.Message))
            }
            time.Sleep(1 * time.Second)
        }
    }()
    return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

运行指令go run main.go --config=config.toml,可以看到以下运行结果 image 我们的gRPC客户端通过应用名称mainETCD中获取到服务地址,并监听了/wsd-reg/main,用于后续更新服务地址。

客户端会定时1s,发送hellogRPC服务端,可以收到服务端响应的``Jupiter 1类似信息到服务端

# 3.2.4.3 从零开始配置

环境要求 jupiter(commit:8b67ebec1ae6dc07e8df27d7240aa9b4d954671b)如果用的minerva就用1.8

生成potobuf

syntax = "proto3";
package pb;

service Hello {
  // SayHello
  rpc SayHello(SayHelloReq) returns (SayHelloRes);
}

message SayHelloReq{
  string name = 1;
}

message SayHelloRes{
  string resp = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

执行命令: protoc -I . --go_out=plugins=grpc:. ./hello.proto 拿到对应pb包放入对应项目的路径

go build -o jupiter-demo main.go 记得把下面的demo编译成jupiter-demo,再执行 jupiter 客户端 使用demo

# 配置
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg"  # 前缀

[jupiter.client.etcdserver]
address = "etcd:///jupiter-demo" # jupiter-demo 指的是执行文件名称,在发布平台执行文件名称跟应用名称是一样的
block = true # 默认值
dialTimeout = "0s" # 默认值

package main

import (
	"clientb/pb"
	"context"
	"fmt"
	"time"

	"github.com/douyu/jupiter"
	"github.com/douyu/jupiter/pkg/client/grpc"
	"github.com/douyu/jupiter/pkg/client/grpc/balancer"
	"github.com/douyu/jupiter/pkg/client/grpc/resolver"
	"github.com/douyu/jupiter/pkg/registry/etcdv3"
	"github.com/douyu/jupiter/pkg/xlog"
)

func main() {
	eng := NewEngine()
	if err := eng.Run(); err != nil {
		xlog.Error(err.Error())
	}
	fmt.Printf("111 = %+v\n", 111)
}

type Engine struct {
	jupiter.Application
}

func NewEngine() *Engine {
	eng := &Engine{}
	if err := eng.Startup(
		eng.initResolver,
		eng.consumer,
	); err != nil {
		xlog.Panic("startup", xlog.Any("err", err))
	}
	return eng
}

func (eng *Engine) initResolver() error {
	resolver.Register("etcd", etcdv3.StdConfig("wh").Build())
	return nil
}

func (eng *Engine) consumer() error {
	config := grpc.StdConfig("etcdserver")
	config.BalancerName = balancer.NameSmoothWeightRoundRobin
	client := pb.NewHelloClient(config.Build())
	go func() {
		i:=0
		ghj := map[string]int{}
		for {

			i++
			resp, err := client.SayHello(context.Background(), &pb.SayHelloReq{
				Name: fmt.Sprintf("jupiter:%d",i),
			})
			if err != nil {
				fmt.Printf("err = %+v\n%s", err,"iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
				xlog.Error(err.Error())
			} else {
				ghj[resp.Resp] = ghj[resp.Resp] + 1
				fmt.Printf("resp.Message = %+v\n", ghj)
				xlog.Info("receive response", xlog.String("resp", resp.Resp))
			}
			time.Sleep(1 * time.Second)
		}
	}()
	return nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

minerva 客户端 demo

# 配置
[app]
[app.registry.etcd]# 使用etcd作为服务发现
endpoints=["127.0.0.1:2379"] #etcd的地址,grpc的服务端须已经注册到这个etcd
timeout="2s"
[minerva]
[minerva.grpc]
[minerva.grpc.wsg-reg] # wsg-reg 这里得注意,这是服务端的前缀
debug = true # Debug开关
enableMetric = true # 指标采集开关
enableAccessLog = true # 访问日志开关
addr = "jupiter-demo" #目标地址。direct=true,该值设为服务ip:port, direct=false,则为服务注册名,其实就是执行文件名称,也就是应用名称
dialTimeout = "1s" # 拨超时
readTimeout = "1s" # 读超时
enableTrace = false # 链路追踪开关
balancerName = "round_robin" # 默认为round_robin
level = "panic" # 创建时的告警等级,level=panic创建Client失败时panic
wait = true # 默认:true 是否一直等待直到连接建立,wait=true时,dialTimeout失效。注意Wait可能会导致创建过程阻塞
direct = false # 直连服务,不经过负载均衡器
slowThreshold = "1s" # slow日志门限值


package main

import (
	"context"
	"demomimi/pb"
	"fmt"
	"time"
)
import "git.xxxx.com/vega/minerva/client/gusty"
// 新建demo客户端
var (
	DemoClient pb.HelloClient
)

// 读取demo的grpc配置,并初始化
func init() {

	DemoClient = pb.NewHelloClient(gusty.Invoker("wsg-reg"))
}

func main()  {
	sdfg := map[string]int{}
	for{
		resp,err:=SayHello()
		if err !=nil{
			fmt.Println(err,"uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu")
			continue
		}
		sdfg[resp.Resp] = sdfg[resp.Resp] + 1
		fmt.Println("uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu",sdfg)
		time.Sleep(time.Second * 1)
	}

}


// SayHello 调用该grpc的方法
func SayHello() (*pb.SayHelloRes, error) {
	ctx := context.Background()
	ctx, grpcTimeOut := context.WithTimeout(ctx, 1*time.Second)
	defer grpcTimeOut()
	helloRes, err := DemoClient.SayHello(
		ctx,
		&pb.SayHelloReq{
			Name: fmt.Sprintf("word"),
		},
	)
	if err != nil {
		return &pb.SayHelloRes{}, err
	}
	return helloRes, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

jupiter 服务端调用


[jupiter.server.grpc]
    port = 20102
[jupiter.registry.wh]
connectTimeout = "1s"
endpoints=["127.0.0.1:2379"]
secure = false
prefix = "wsd-reg" # 这个前缀记得跟client保持一致



package main

import (
	"context"
	"fmt"
	"github.com/douyu/jupiter"
	compound_registry "github.com/douyu/jupiter/pkg/registry/compound"
	etcdv3_registry "github.com/douyu/jupiter/pkg/registry/etcdv3"
	"github.com/douyu/jupiter/pkg/server/xgrpc"
	"github.com/douyu/jupiter/pkg/xlog"
	"google.golang.org/grpc/examples/helloworld/helloworld"
)

func main() {
	eng := NewEngine()
	eng.SetRegistry(
		compound_registry.New(
			etcdv3_registry.StdConfig("wh").Build(),
		),
	)
	//eng.SetGovernor("0.0.0.0:0")
	if err := eng.Run(); err != nil {
		xlog.Error(err.Error())
	}
}

type Engine struct {
	jupiter.Application
}

func NewEngine() *Engine {
	eng := &Engine{}
	if err := eng.Startup(
		eng.serveGRPC,
	); err != nil {
		xlog.Panic("startup", xlog.Any("err", err))
	}
	return eng
}

func (eng *Engine) serveGRPC() error {
	fmt.Println("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY")
	server := xgrpc.StdConfig("grpc").Build()
	helloworld.RegisterGreeterServer(server.Server, new(Greeter))
	return eng.Serve(server)
}


type Greeter struct {
	server *xgrpc.Server
}
func (Greeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
	sd := &helloworld.HelloReply{
		Message: "我是client_b",
	}
	fmt.Println(fmt.Sprintf("name:%s",request.Name))
	return sd,nil
}



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

minerva 服务端 demo

[app]
mode="local"
[app.registry]
[app.registry.etcd]
endPoints = ["127.0.0.1:2379"]
timeout = "2s"
[server.grpc]
port=9001
name="demo"
[server.grpc.labels]
group = "default" # default: default
weight = "10" # default: 100
enable = "true" # default: true



package main

import (
	"context"
	"fmt"
	"git.xxxx.com/vega/minerva"
	"git.xxxx.com/vega/minerva/application"
	"git.xxxx.com/vega/minerva/server"
	"git.xxxx.com/vega/minerva/server/yell"
	"mimi/pb"
)

// 定义handler
type HelloHandler struct {
	yell.Handler
}


// 定义方法
func (s *HelloHandler) SayHello(ctx context.Context, in *pb.SayHelloReq) (out *pb.SayHelloRes, err error) {
	fmt.Println(in.Name,"nnnnnnnn")
	return &pb.SayHelloRes{Resp:"hello1"},nil
}


// 定义一个grpc server,应用微服务的理念,一个服务只能定义一个server
type GrpcServer struct {
	*yell.Server
}


// 将HelloServer注册到server
func (s *GrpcServer) Mux() {
	s.Register(pb.RegisterHelloServer, new(HelloHandler))
	// 这里可以继续注册其他的Handler
}

// main函数中启动grpc server
func main() {
	fmt.Println(application.BuildFlags())
	app := minerva.NewAPP()
	app.Serve(
		new(GrpcServer),
		server.StdConfig("grpc"),
		server.Host("127.0.0.1"),
	)
	app.Run() 
}
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65