Skip to content

Commit

Permalink
feat: cluster-resource 支持redis哨兵和集群模式
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyudeqiu committed Sep 30, 2024
1 parent efdc804 commit a041d22
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 84 deletions.
4 changes: 2 additions & 2 deletions bcs-services/cluster-resources/cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (c *DependencyServiceChecker) DoAndExit() {
// doOnce 对依赖服务进行一次检查,任意服务不可用,都返回错误
func (c *DependencyServiceChecker) doOnce() error {
// 检查 Redis 服务,若服务异常,则返回错误
rds := redis.NewStandaloneClient(&c.conf.Redis)
if _, err := rds.Ping(context.TODO()).Result(); err != nil {
rds, err := redis.NewRedisClient(&c.conf.Redis)
if _, err = rds.Ping(context.TODO()); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions bcs-services/cluster-resources/etc/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ log:

# Redis 配置信息
redis:
# 地址列表用 , 分隔多个节点
address: "127.0.0.1:6379"
db: 0
password: ""
# 以下项非必须可不启用
# redis 的模式 默认使用
# redisMode: single # 可选填参数: single sentinel cluster
# masterName: master
# dialTimeout: 2
# readTimeout: 1
# writeTimeout: 1
Expand Down
8 changes: 4 additions & 4 deletions bcs-services/cluster-resources/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ require (
// fork 自 https://github.com/signalfx/splunk-otel-go/tree/main/instrumentation/k8s.io/client-go/splunkclient-go
// 待升级到 go1.18 后,可直接引用社区的包
github.com/adevjoe/opentelemetry-go-contrib/instrumentation/k8s.io/client-go v1.0.2
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/alicebob/miniredis v2.5.0+incompatible // indirect
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/dustin/go-humanize v1.0.0
github.com/envoyproxy/protoc-gen-validate v1.0.4
github.com/fatih/structs v1.1.0
github.com/go-micro/plugins/v4/registry/etcd v1.1.0
github.com/go-micro/plugins/v4/server/grpc v1.2.0
github.com/go-redis/cache/v8 v8.4.3
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-resty/resty/v2 v2.12.0
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/golang/glog v1.2.0
Expand Down Expand Up @@ -138,7 +138,7 @@ require (
github.com/gobwas/ws v1.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.8.9 // indirect
github.com/gomodule/redigo v1.9.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down Expand Up @@ -185,7 +185,7 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.etcd.io/etcd/api/v3 v3.5.2 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect
go.etcd.io/etcd/client/v3 v3.5.2 // indirect
Expand Down
84 changes: 53 additions & 31 deletions bcs-services/cluster-resources/pkg/cache/redis/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"sync"
"time"

redisotel2 "github.com/go-redis/redis/extra/redisotel/v8"
"github.com/go-redis/redis/v8"

"github.com/Tencent/bk-bcs/bcs-common/common/redisclient"
"github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/common/runmode"
crRuntime "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/common/runtime"
"github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/config"
log "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/logging"
"github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/util/stringx"
redisotel2 "github.com/go-redis/redis/extra/redisotel/v8"
)

var rds *redis.Client
var rds redisclient.Client

var initOnce sync.Once

Expand All @@ -45,46 +45,61 @@ const (
minIdleConnsMultiple = 10
// idleTimeout unit: min
idleTimeout = 3
// Sentinel mode
Sentinel = "sentinel"
// Cluster mode
Cluster = "cluster"
// Single mode
Single = "single"
)

// NewStandaloneClient 创建单实例模式 RedisClient(非哨兵模式)
func NewStandaloneClient(conf *config.RedisConf) *redis.Client {
opt := &redis.Options{
Addr: conf.Address,
Password: conf.Password,
// NewRedisClient 根据配置创建不同模式的 RedisClient
func NewRedisClient(conf *config.RedisConf) (redisclient.Client, error) {
clientConf := redisclient.Config{
Addrs: stringx.Split(conf.Address),
DB: conf.DB,
Password: conf.Password,
// 默认配置
DialTimeout: time.Duration(dialTimeout) * time.Second,
ReadTimeout: time.Duration(readTimeout) * time.Second,
WriteTimeout: time.Duration(writeTimeout) * time.Second,
PoolSize: pollSizeMultiple * runtime.NumCPU(),
MinIdleConns: minIdleConnsMultiple * runtime.NumCPU(),
IdleTimeout: time.Duration(idleTimeout) * time.Minute,
}

// 默认配置
opt.DialTimeout = time.Duration(dialTimeout) * time.Second
opt.ReadTimeout = time.Duration(readTimeout) * time.Second
opt.WriteTimeout = time.Duration(writeTimeout) * time.Second
opt.PoolSize = pollSizeMultiple * runtime.NumCPU()
opt.MinIdleConns = minIdleConnsMultiple * runtime.NumCPU()
opt.IdleTimeout = time.Duration(idleTimeout) * time.Minute

// 若配置中指定,则使用
if conf.DialTimeout > 0 {
opt.DialTimeout = time.Duration(conf.DialTimeout) * time.Second
clientConf.DialTimeout = time.Duration(conf.DialTimeout) * time.Second
}
if conf.ReadTimeout > 0 {
opt.ReadTimeout = time.Duration(conf.ReadTimeout) * time.Second
clientConf.ReadTimeout = time.Duration(conf.ReadTimeout) * time.Second
}
if conf.WriteTimeout > 0 {
opt.WriteTimeout = time.Duration(conf.WriteTimeout) * time.Second
clientConf.WriteTimeout = time.Duration(conf.WriteTimeout) * time.Second
}
if conf.PoolSize > 0 {
opt.PoolSize = conf.PoolSize
clientConf.PoolSize = conf.PoolSize
}
if conf.MinIdleConns > 0 {
opt.MinIdleConns = conf.MinIdleConns
clientConf.MinIdleConns = conf.MinIdleConns
}

switch conf.RedisMode {
case Sentinel:
clientConf.Mode = redisclient.SentinelMode
clientConf.MasterName = conf.MasterName
case Cluster:
clientConf.Mode = redisclient.ClusterMode
default:
clientConf.Mode = redisclient.SingleMode
}

log.Info(context.TODO(),
"start connect redis: %s [db=%d, dialTimeout=%s, readTimeout=%s, writeTimeout=%s, poolSize=%d, minIdleConns=%d, idleTimeout=%s]", //nolint:lll
opt.Addr, opt.DB, opt.DialTimeout, opt.ReadTimeout, opt.WriteTimeout, opt.PoolSize, opt.MinIdleConns, opt.IdleTimeout)
"start connect redis: %v [mode=%s db=%d, dialTimeout=%s, readTimeout=%s, writeTimeout=%s, poolSize=%d, minIdleConns=%d, idleTimeout=%s]", //nolint:lll
clientConf.Addrs, clientConf.DB, clientConf.Mode, clientConf.DialTimeout, clientConf.ReadTimeout, clientConf.WriteTimeout, clientConf.PoolSize, clientConf.MinIdleConns, clientConf.IdleTimeout)

return redis.NewClient(opt)
return redisclient.NewClient(clientConf)
}

// InitRedisClient 初始化 Redis 客户端
Expand All @@ -93,23 +108,30 @@ func InitRedisClient(conf *config.RedisConf) {
return
}
initOnce.Do(func() {
rds = NewStandaloneClient(conf)
rds.AddHook(redisotel2.NewTracingHook())
var err error
// 初始化失败,panic
if rds, err = NewRedisClient(conf); err != nil {
panic(err)
}
rds.GetCli().AddHook(redisotel2.NewTracingHook())
// 若 Redis 服务异常,应重置 rds 并 panic
if _, err := rds.Ping(context.TODO()).Result(); err != nil {
if _, err = rds.Ping(context.TODO()); err != nil {
rds = nil
panic(err)
}
})
}

// GetDefaultClient 获取默认 Redis 客户端
func GetDefaultClient() *redis.Client {
func GetDefaultClient() redisclient.Client {
if rds == nil {
// 单元测试模式下,自动启用测试用 Redis,否则需要提前初始化
if crRuntime.RunMode == runmode.UnitTest {
rds = NewTestRedisClient()
rds.AddHook(redisotel2.NewTracingHook())
var err error
if rds, err = redisclient.NewTestClient(); err != nil {
panic(err)
}
rds.GetCli().AddHook(redisotel2.NewTracingHook())
return rds
}
panic("prod and stag run mode need init redis!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ func TestGetDefaultClient(t *testing.T) {
rdsCli := GetDefaultClient()
assert.NotNil(t, rdsCli)

ret, _ := rds.Ping(context.TODO()).Result()
ret, _ := rds.Ping(context.TODO())
assert.Equal(t, "PONG", ret)
}
25 changes: 12 additions & 13 deletions bcs-services/cluster-resources/pkg/cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
"fmt"
"time"

"github.com/go-redis/cache/v8"
"github.com/go-redis/redis/v8"

"github.com/Tencent/bk-bcs/bcs-common/common/redisclient"
crCache "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/cache"
"github.com/go-redis/cache/v8"
)

const (
Expand All @@ -31,11 +30,11 @@ const (

// Cache 缓存实例
type Cache struct {
name string // 缓存键名
keyPrefix string // 缓存键前缀
codec *cache.Cache // go-redis cache
cli *redis.Client // redis client
exp time.Duration // 默认过期时间
name string // 缓存键名
keyPrefix string // 缓存键前缀
codec *cache.Cache // go-redis cache
cli redisclient.Client // redis client
exp time.Duration // 默认过期时间
}

// NewCache 新建 cache 实例
Expand All @@ -46,7 +45,7 @@ func NewCache(name string, expiration time.Duration) *Cache {
keyPrefix := fmt.Sprintf("%s:%s", CacheKeyPrefix, name)

codec := cache.New(&cache.Options{
Redis: cli,
Redis: cli.GetCli(),
})

return &Cache{
Expand Down Expand Up @@ -80,7 +79,7 @@ func (c *Cache) Set(ctx context.Context, key crCache.Key, value interface{}, dur
// Exists 检查 key 在 redis 中是否存在
func (c *Cache) Exists(ctx context.Context, key crCache.Key) bool {
k := c.genKey(key.Key())
count, err := c.cli.Exists(ctx, k).Result()
count, err := c.cli.Exists(ctx, k)
return err == nil && count == 1
}

Expand All @@ -93,15 +92,15 @@ func (c *Cache) Get(ctx context.Context, key crCache.Key, value interface{}) err
// Delete 从 redis 中删除指定的键
func (c *Cache) Delete(ctx context.Context, key crCache.Key) error {
k := c.genKey(key.Key())
_, err := c.cli.Del(ctx, k).Result()
_, err := c.cli.Del(ctx, k)
return err
}

// DeleteByPrefix 根据键前缀删除缓存,慎用!
func (c *Cache) DeleteByPrefix(ctx context.Context, prefix string) error {
iter := c.cli.Scan(ctx, 0, c.genKey(prefix)+"*", 0).Iterator()
iter := c.cli.GetCli().Scan(ctx, 0, c.genKey(prefix)+"*", 0).Iterator()
for iter.Next(ctx) {
if err := c.cli.Del(ctx, iter.Val()).Err(); err != nil {
if _, err := c.cli.Del(ctx, iter.Val()); err != nil {
return err
}
}
Expand Down
32 changes: 0 additions & 32 deletions bcs-services/cluster-resources/pkg/cache/redis/testing.go

This file was deleted.

2 changes: 2 additions & 0 deletions bcs-services/cluster-resources/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ type RedisConf struct {
Address string `yaml:"address" usage:"Redis Server Address"`
DB int `yaml:"db" usage:"Redis DB"`
Password string `yaml:"password" usage:"Redis Password"`
RedisMode string `yaml:"redisMode" usage:"Redis Mode"`
MasterName string `yaml:"masterName" usage:"Redis MasterName for Sentinel Mode"`
DialTimeout int `yaml:"dialTimeout" usage:"Redis Dial Timeout"`
ReadTimeout int `yaml:"readTimeout" usage:"Redis Read Timeout(s)"`
WriteTimeout int `yaml:"writeTimeout" usage:"Redis Write Timeout(s)"`
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/cluster-resources/pkg/handler/basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *Handler) Healthz(
allOK := true

// 检查 redis 状态
ret, err := redis.GetDefaultClient().Ping(ctx).Result()
ret, err := redis.GetDefaultClient().Ping(ctx)
if ret != "PONG" || err != nil {
resp.Redis = genHealthzStatus(false, "Redis Ping Failed")
allOK = false
Expand Down

0 comments on commit a041d22

Please sign in to comment.