From efdc804ab0dbc2e083e8adbd633bc1dbe6781a1f Mon Sep 17 00:00:00 2001 From: carlchen Date: Sun, 29 Sep 2024 13:55:27 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20common=20=E6=B7=BB=E5=8A=A0=20redis?= =?UTF-8?q?=20sdk=20=E6=94=AF=E6=8C=81=E5=93=A8=E5=85=B5=E5=92=8C=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bcs-common/common/redisclient/cluster.go | 62 +++++++++ bcs-common/common/redisclient/cluster_test.go | 58 +++++++++ bcs-common/common/redisclient/config.go | 20 +++ bcs-common/common/redisclient/sentinel.go | 68 ++++++++++ .../common/redisclient/sentinel_test.go | 61 +++++++++ bcs-common/common/redisclient/single.go | 81 ++++++++++++ bcs-common/common/redisclient/single_test.go | 119 ++++++++++++++++++ bcs-common/common/redisclient/types.go | 55 ++++++++ bcs-common/go.mod | 6 + 9 files changed, 530 insertions(+) create mode 100644 bcs-common/common/redisclient/cluster.go create mode 100644 bcs-common/common/redisclient/cluster_test.go create mode 100644 bcs-common/common/redisclient/config.go create mode 100644 bcs-common/common/redisclient/sentinel.go create mode 100644 bcs-common/common/redisclient/sentinel_test.go create mode 100644 bcs-common/common/redisclient/single.go create mode 100644 bcs-common/common/redisclient/single_test.go create mode 100644 bcs-common/common/redisclient/types.go diff --git a/bcs-common/common/redisclient/cluster.go b/bcs-common/common/redisclient/cluster.go new file mode 100644 index 0000000000..17117be8d8 --- /dev/null +++ b/bcs-common/common/redisclient/cluster.go @@ -0,0 +1,62 @@ +package redisclient + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" +) + +type ClusterClient struct { + cli *redis.ClusterClient +} + +func NewClusterClient(config Config) (*ClusterClient, error) { + cli := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: config.Addrs, + Password: config.Password, + DialTimeout: config.DialTimeout * time.Second, + ReadTimeout: config.ReadTimeout * time.Second, + WriteTimeout: config.WriteTimeout * time.Second, + PoolSize: config.PoolSize, + MinIdleConns: config.MinIdleConns, + IdleTimeout: config.IdleTimeout * time.Second, + }) + return &ClusterClient{cli: cli}, nil +} + +func (c *ClusterClient) GetCli() redis.UniversalClient { + return c.cli +} + +func (c *ClusterClient) Ping(ctx context.Context) (string, error) { + return c.cli.Ping(ctx).Result() +} + +func (c *ClusterClient) Get(ctx context.Context, key string) (string, error) { + return c.cli.Get(ctx, key).Result() +} + +func (c *ClusterClient) Exists(ctx context.Context, key ...string) (int64, error) { + return c.cli.Exists(ctx, key...).Result() +} + +func (c *ClusterClient) Set(ctx context.Context, key string, value interface{}, duration time.Duration) (string, error) { + return c.cli.Set(ctx, key, value, duration).Result() +} + +func (c *ClusterClient) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (bool, error) { + return c.cli.SetNX(ctx, key, value, expiration).Result() +} + +func (c *ClusterClient) SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, error) { + return c.cli.SetEX(ctx, key, value, expiration).Result() +} + +func (c *ClusterClient) Del(ctx context.Context, key string) (int64, error) { + return c.cli.Del(ctx, key).Result() +} + +func (c *ClusterClient) Expire(ctx context.Context, key string, duration time.Duration) (bool, error) { + return c.cli.Expire(ctx, key, duration).Result() +} diff --git a/bcs-common/common/redisclient/cluster_test.go b/bcs-common/common/redisclient/cluster_test.go new file mode 100644 index 0000000000..31273f241e --- /dev/null +++ b/bcs-common/common/redisclient/cluster_test.go @@ -0,0 +1,58 @@ +package redisclient + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// setupClusterClient function for initializing Redis ClusterClient +func setupClusterClient(t *testing.T) *ClusterClient { + config := Config{ + Mode: ClusterMode, + Addrs: []string{"127.0.0.1:7021", "127.0.0.1:7022", "127.0.0.1:7023"}, + } + client, err := NewClusterClient(config) + assert.NoError(t, err) + assert.NotNil(t, client) + return client +} + +// TestClusterPing tests ClusterClient connectivity +func TestClusterPing(t *testing.T) { + client := setupClusterClient(t) + result, err := client.GetCli().Ping(context.TODO()).Result() + assert.NoError(t, err) + assert.Equal(t, "PONG", result) +} + +// TestClusterClient tests ClusterClient basic functionality +func TestClusterClient(t *testing.T) { + client := setupClusterClient(t) + ctx := context.Background() + + // Test Set operation + _, err := client.Set(ctx, "key1", "value1", 10*time.Second) + assert.NoError(t, err) + + // Test Get operation + val, err := client.Get(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, "value1", val) + + // Test Exists operation + exists, err := client.Exists(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, int64(1), exists) + + // Test Del operation + _, err = client.Del(ctx, "key1") + assert.NoError(t, err) + + // Test if the key has been deleted + exists, err = client.Exists(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, int64(0), exists) +} diff --git a/bcs-common/common/redisclient/config.go b/bcs-common/common/redisclient/config.go new file mode 100644 index 0000000000..19faac41df --- /dev/null +++ b/bcs-common/common/redisclient/config.go @@ -0,0 +1,20 @@ +package redisclient + +import "time" + +// Config 初始化 Redis 客户端需要使用的 +type Config struct { + Addrs []string // 节点列表 + MasterName string // 哨兵模式下的主节点名 + Password string // 密码 + DB int // 单节点模式下的数据库 + Mode RedisMode // single, sentinel, cluster + + // Options configs + DialTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + PoolSize int + MinIdleConns int + IdleTimeout time.Duration +} diff --git a/bcs-common/common/redisclient/sentinel.go b/bcs-common/common/redisclient/sentinel.go new file mode 100644 index 0000000000..c9ae615ff7 --- /dev/null +++ b/bcs-common/common/redisclient/sentinel.go @@ -0,0 +1,68 @@ +package redisclient + +import ( + "context" + "errors" + "time" + + "github.com/go-redis/redis/v8" +) + +type SentinelClient struct { + cli *redis.Client +} + +func NewSentinelClient(config Config) (*SentinelClient, error) { + if config.Mode != SentinelMode { + return nil, errors.New("redis mode not supported") + } + cli := redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: config.MasterName, + SentinelAddrs: config.Addrs, + Password: config.Password, + DB: config.DB, + DialTimeout: config.DialTimeout, + ReadTimeout: config.ReadTimeout, + WriteTimeout: config.WriteTimeout, + PoolSize: config.PoolSize, + MinIdleConns: config.MinIdleConns, + IdleTimeout: config.IdleTimeout, + }) + return &SentinelClient{cli: cli}, nil +} + +func (c *SentinelClient) GetCli() redis.UniversalClient { + return c.cli +} + +func (c *SentinelClient) Ping(ctx context.Context) (string, error) { + return c.cli.Ping(ctx).Result() +} + +func (c *SentinelClient) Set(ctx context.Context, key string, value interface{}, duration time.Duration) (string, error) { + return c.cli.Set(ctx, key, value, duration).Result() +} + +func (c *SentinelClient) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (bool, error) { + return c.cli.SetNX(ctx, key, value, expiration).Result() +} + +func (c *SentinelClient) Get(ctx context.Context, key string) (string, error) { + return c.cli.Get(ctx, key).Result() +} + +func (c *SentinelClient) Exists(ctx context.Context, key ...string) (int64, error) { + return c.cli.Exists(ctx, key...).Result() +} + +func (c *SentinelClient) SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, error) { + return c.cli.SetEX(ctx, key, value, expiration).Result() +} + +func (c *SentinelClient) Del(ctx context.Context, key string) (int64, error) { + return c.cli.Del(ctx, key).Result() +} + +func (c *SentinelClient) Expire(ctx context.Context, key string, duration time.Duration) (bool, error) { + return c.cli.Expire(ctx, key, duration).Result() +} diff --git a/bcs-common/common/redisclient/sentinel_test.go b/bcs-common/common/redisclient/sentinel_test.go new file mode 100644 index 0000000000..a94f4e88ee --- /dev/null +++ b/bcs-common/common/redisclient/sentinel_test.go @@ -0,0 +1,61 @@ +package redisclient + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// setupClusterClient function for initializing Redis SentinelClient +func setupSentinel(t *testing.T) *SentinelClient { + config := Config{ + Mode: SentinelMode, + Addrs: []string{"127.0.0.1:5001"}, // Sentinel addresses + MasterName: "mymaster", // Master name + DB: 0, + Password: "", + } + client, err := NewSentinelClient(config) + assert.NoError(t, err) + assert.NotNil(t, client) + return client +} + +// TestSentinelClientPing tests SentinelClient connectivity +func TestSentinelClientPing(t *testing.T) { + client := setupSentinel(t) + result, err := client.GetCli().Ping(context.TODO()).Result() + assert.NoError(t, err) + assert.Equal(t, "PONG", result) +} + +// TestSentinelClient tests SentinelClient basic functionality +func TestSentinelClient(t *testing.T) { + client := setupSentinel(t) + ctx := context.Background() + + // Test Set operation + _, err := client.Set(ctx, "key1", "value1", 10*time.Second) + assert.NoError(t, err) + + // Test Get operation + val, err := client.Get(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, "value1", val) + + // Test Exists operation + exists, err := client.Exists(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, int64(1), exists) + + // Test Del operation + _, err = client.Del(ctx, "key1") + assert.NoError(t, err) + + // Test if the key has been deleted + exists, err = client.Exists(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, int64(0), exists) +} diff --git a/bcs-common/common/redisclient/single.go b/bcs-common/common/redisclient/single.go new file mode 100644 index 0000000000..e7bfecb098 --- /dev/null +++ b/bcs-common/common/redisclient/single.go @@ -0,0 +1,81 @@ +package redisclient + +import ( + "context" + "errors" + "time" + + "github.com/go-redis/redis/v8" +) + +type SingleClient struct { + cli *redis.Client +} + +// NewSingleClient init SingleClient from config +func NewSingleClient(config Config) (*SingleClient, error) { + if config.Mode != SingleMode { + return nil, errors.New("redis mode not supported") + } + if len(config.Addrs) == 0 { + return nil, errors.New("address is empty") + } + cli := redis.NewClient(&redis.Options{ + Addr: config.Addrs[0], + Password: config.Password, + DB: config.DB, + DialTimeout: config.DialTimeout, + ReadTimeout: config.ReadTimeout, + WriteTimeout: config.WriteTimeout, + PoolSize: config.PoolSize, + MinIdleConns: config.MinIdleConns, + IdleTimeout: config.IdleTimeout, + }) + return &SingleClient{cli: cli}, nil +} + +// NewSingleClientFromDSN init SingleClient by dsn +func NewSingleClientFromDSN(dsn string) (*SingleClient, error) { + options, err := redis.ParseURL(dsn) + if err != nil { + return nil, err + } + cli := redis.NewClient(options) + return &SingleClient{cli: cli}, nil +} + +func (c *SingleClient) GetCli() redis.UniversalClient { + return c.cli +} + +func (c *SingleClient) Ping(ctx context.Context) (string, error) { + return c.cli.Ping(ctx).Result() +} + +func (c *SingleClient) Get(ctx context.Context, key string) (string, error) { + return c.cli.Get(ctx, key).Result() +} + +func (c *SingleClient) Set(ctx context.Context, key string, value interface{}, duration time.Duration) (string, error) { + return c.cli.Set(ctx, key, value, duration).Result() +} + +func (c *SingleClient) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (bool, error) { + return c.cli.SetNX(ctx, key, value, expiration).Result() +} + +func (c *SingleClient) SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, error) { + return c.cli.SetEX(ctx, key, value, expiration).Result() +} + +func (c *SingleClient) Exists(ctx context.Context, key ...string) (int64, error) { + return c.cli.Exists(ctx, key...).Result() +} + +func (c *SingleClient) Del(ctx context.Context, key string) (int64, error) { + return c.cli.Del(ctx, key).Result() +} + +func (c *SingleClient) Expire(ctx context.Context, key string, duration time.Duration) (bool, error) { + return c.cli.Expire(ctx, key, duration).Result() +} diff --git a/bcs-common/common/redisclient/single_test.go b/bcs-common/common/redisclient/single_test.go new file mode 100644 index 0000000000..339b18d6b8 --- /dev/null +++ b/bcs-common/common/redisclient/single_test.go @@ -0,0 +1,119 @@ +package redisclient + +import ( + "context" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" +) + +func setupSingleClient() Client { + // 创建 Redis 单机配置 + config := Config{ + Mode: SingleMode, + Addrs: []string{"127.0.0.1:6379"}, + DB: 0, + } + + // 初始化 Redis 客户端,使用可选参数设置连接超时 + client, _ := NewClient(config) + return client +} + +// Ping 测试 +func TestPing(t *testing.T) { + client := setupSingleClient() + result, err := client.GetCli().Ping(context.TODO()).Result() + assert.NoError(t, err) + assert.Equal(t, result, "PONG") +} + +// 测试 SingleClient 基础功能 +func TestSingleClient(t *testing.T) { + client := setupSingleClient() + assert.NotNil(t, client) + + ctx := context.Background() + + // 测试 Set 操作 + _, err := client.Set(ctx, "key1", "value1", 10*time.Second) + assert.NoError(t, err) + + // 测试 Get 操作 + val, err := client.Get(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, "value1", val) + + // 测试键存在性 + exists, err := client.Exists(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, int64(1), exists) + + // 测试 Del 操作 + _, err = client.Del(ctx, "key1") + assert.NoError(t, err) + + // 测试键是否已删除 + exists, err = client.Exists(ctx, "key1") + assert.NoError(t, err) + assert.Equal(t, int64(0), exists) + +} + +// 测试 SetEX 和 SetNX 操作 +func TestSingleClientSetEXAndSetNX(t *testing.T) { + client := setupSingleClient() + assert.NotNil(t, client) + + ctx := context.Background() + + // 测试 SetEX 操作,设置带有过期时间的键 + _, err := client.SetEX(ctx, "key2", "value2", 5*time.Second) + assert.NoError(t, err) + + // 获取 key2,确保值正确 + val, err := client.Get(ctx, "key2") + assert.NoError(t, err) + assert.Equal(t, "value2", val) + + // 确认 key2 在 Redis 中存在 + exists, err := client.Exists(ctx, "key2") + assert.NoError(t, err) + assert.Equal(t, int64(1), exists) + + // 等待过期时间后检查键是否存在 + time.Sleep(6 * time.Second) + exists, err = client.Exists(ctx, "key2") + assert.NoError(t, err) + assert.Equal(t, int64(0), exists) + + // 测试 SetNX 操作,只有当键不存在时才能设置 + success, err := client.SetNX(ctx, "key3", "value3", 10*time.Second) + assert.NoError(t, err) + assert.True(t, success) + + // 再次尝试 SetNX 操作,这次应该返回 false,因为键已经存在 + success, err = client.SetNX(ctx, "key3", "value3", 10*time.Second) + assert.NoError(t, err) + assert.False(t, success) + + // 删除 key3 + _, err = client.Del(ctx, "key3") + assert.NoError(t, err) +} + +// 测试边界情况,例如不存在的键 +func TestSingleClientGetNonExistentKey(t *testing.T) { + client := setupSingleClient() + assert.NotNil(t, client) + + ctx := context.Background() + + // 测试获取不存在的键,应该返回空字符串和 redis.Nil 错误 + val, err := client.Get(ctx, "nonexistent") + assert.Error(t, err) + assert.Equal(t, redis.Nil, err) + assert.Equal(t, "", val) +} diff --git a/bcs-common/common/redisclient/types.go b/bcs-common/common/redisclient/types.go new file mode 100644 index 0000000000..7e9744a872 --- /dev/null +++ b/bcs-common/common/redisclient/types.go @@ -0,0 +1,55 @@ +package redisclient + +import ( + "context" + "fmt" + "time" + + "github.com/alicebob/miniredis" + "github.com/go-redis/redis/v8" +) + +type RedisMode string + +const ( + SingleMode RedisMode = "single" // 单机模式 + SentinelMode RedisMode = "sentinel" // 哨兵模式 + ClusterMode RedisMode = "cluster" // 集群模式 +) + +type Client interface { + GetCli() redis.UniversalClient + Ping(ctx context.Context) (string, error) + Get(ctx context.Context, key string) (string, error) + Exists(ctx context.Context, key ...string) (int64, error) + Set(ctx context.Context, key string, value interface{}, duration time.Duration) (string, error) + SetEX(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, error) + SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (bool, error) + Del(ctx context.Context, key string) (int64, error) + Expire(ctx context.Context, key string, duration time.Duration) (bool, error) +} + +// NewClient 根据配置文件创建不同部署模式的 redis 客户端 +func NewClient(config Config) (Client, error) { + switch config.Mode { + case SingleMode: + return NewSingleClient(config) + case SentinelMode: + return NewSentinelClient(config) + case ClusterMode: + return NewClusterClient(config) + } + return nil, fmt.Errorf("invalid config mode: %s", config.Mode) +} + +// NewTestClient 创建用于单元测试的 redis 客户端 +func NewTestClient() (Client, error) { + mr, err := miniredis.Run() + if err != nil { + return nil, err + } + client := redis.NewClient(&redis.Options{ + Addr: mr.Addr(), + }) + return &SingleClient{cli: client}, nil +} diff --git a/bcs-common/go.mod b/bcs-common/go.mod index e8ef98f499..a7ac29b71c 100644 --- a/bcs-common/go.mod +++ b/bcs-common/go.mod @@ -6,6 +6,7 @@ require ( github.com/Tencent/bk-bcs/bcs-runtime/bcs-k8s/kubernetes/common v0.0.0-20220330120237-0bbed74dcf6d github.com/TencentBlueKing/bk-audit-go-sdk v0.0.6 github.com/TencentBlueKing/iam-go-sdk v0.1.6 + github.com/alicebob/miniredis v2.5.0+incompatible github.com/bitly/go-simplejson v0.5.0 github.com/docker/engine-api v0.4.0 github.com/dustin/go-humanize v1.0.0 @@ -18,6 +19,7 @@ require ( github.com/go-micro/plugins/v4/broker/stan v1.1.0 github.com/go-micro/plugins/v4/registry/etcd v1.1.0 github.com/go-playground/validator/v10 v10.19.0 + github.com/go-redis/redis/v8 v8.11.5 github.com/go-resty/resty/v2 v2.12.0 github.com/go-sql-driver/mysql v1.7.1 github.com/golang-jwt/jwt/v4 v4.5.0 @@ -74,6 +76,7 @@ require ( github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/TencentBlueKing/gopkg v1.1.0 // indirect + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect @@ -86,6 +89,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cyphar/filepath-securejoin v0.2.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect @@ -114,6 +118,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/gomodule/redigo v1.9.2 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect @@ -165,6 +170,7 @@ require ( github.com/xdg-go/scram v1.1.1 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect go.etcd.io/etcd/api/v3 v3.5.2 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 // indirect From a041d222e08efe78b760ad759d0dc213a218be17 Mon Sep 17 00:00:00 2001 From: carlchen Date: Sun, 29 Sep 2024 20:40:31 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20cluster-resource=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81redis=E5=93=A8=E5=85=B5=E5=92=8C=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bcs-services/cluster-resources/cmd/check.go | 4 +- bcs-services/cluster-resources/etc/conf.yaml | 4 + bcs-services/cluster-resources/go.mod | 8 +- .../cluster-resources/pkg/cache/redis/init.go | 84 ++++++++++++------- .../pkg/cache/redis/init_test.go | 2 +- .../pkg/cache/redis/redis.go | 25 +++--- .../pkg/cache/redis/testing.go | 32 ------- .../cluster-resources/pkg/config/config.go | 2 + .../pkg/handler/basic/basic.go | 2 +- 9 files changed, 79 insertions(+), 84 deletions(-) delete mode 100644 bcs-services/cluster-resources/pkg/cache/redis/testing.go diff --git a/bcs-services/cluster-resources/cmd/check.go b/bcs-services/cluster-resources/cmd/check.go index e9ceb05ba1..dd231da596 100644 --- a/bcs-services/cluster-resources/cmd/check.go +++ b/bcs-services/cluster-resources/cmd/check.go @@ -117,8 +117,8 @@ func (c *DependencyServiceChecker) DoAndExit() { // doOnce 对依赖服务进行一次检查,任意服务不可用,都返回错误 func (c *DependencyServiceChecker) doOnce() error { // 检查 Redis 服务,若服务异常,则返回错误 - rds := redis.NewStandaloneClient(&c.conf.Redis) - if _, err := rds.Ping(context.TODO()).Result(); err != nil { + rds, err := redis.NewRedisClient(&c.conf.Redis) + if _, err = rds.Ping(context.TODO()); err != nil { return err } diff --git a/bcs-services/cluster-resources/etc/conf.yaml b/bcs-services/cluster-resources/etc/conf.yaml index 5bffa6f9db..5419889e2c 100644 --- a/bcs-services/cluster-resources/etc/conf.yaml +++ b/bcs-services/cluster-resources/etc/conf.yaml @@ -70,10 +70,14 @@ log: # Redis 配置信息 redis: + # 地址列表用 , 分隔多个节点 address: "127.0.0.1:6379" db: 0 password: "" # 以下项非必须可不启用 + # redis 的模式 默认使用 + # redisMode: single # 可选填参数: single sentinel cluster + # masterName: master # dialTimeout: 2 # readTimeout: 1 # writeTimeout: 1 diff --git a/bcs-services/cluster-resources/go.mod b/bcs-services/cluster-resources/go.mod index f3ad6a499e..8d2022a643 100644 --- a/bcs-services/cluster-resources/go.mod +++ b/bcs-services/cluster-resources/go.mod @@ -46,7 +46,7 @@ require ( // fork 自 https://github.com/signalfx/splunk-otel-go/tree/main/instrumentation/k8s.io/client-go/splunkclient-go // 待升级到 go1.18 后,可直接引用社区的包 github.com/adevjoe/opentelemetry-go-contrib/instrumentation/k8s.io/client-go v1.0.2 - github.com/alicebob/miniredis v2.5.0+incompatible + github.com/alicebob/miniredis v2.5.0+incompatible // indirect github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/dustin/go-humanize v1.0.0 github.com/envoyproxy/protoc-gen-validate v1.0.4 @@ -54,7 +54,7 @@ require ( github.com/go-micro/plugins/v4/registry/etcd v1.1.0 github.com/go-micro/plugins/v4/server/grpc v1.2.0 github.com/go-redis/cache/v8 v8.4.3 - github.com/go-redis/redis/v8 v8.11.5 + github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/go-resty/resty/v2 v2.12.0 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/golang/glog v1.2.0 @@ -138,7 +138,7 @@ require ( github.com/gobwas/ws v1.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/gomodule/redigo v1.8.9 // indirect + github.com/gomodule/redigo v1.9.2 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect @@ -185,7 +185,7 @@ require ( github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - github.com/yuin/gopher-lua v1.1.0 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect go.etcd.io/etcd/api/v3 v3.5.2 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect go.etcd.io/etcd/client/v3 v3.5.2 // indirect diff --git a/bcs-services/cluster-resources/pkg/cache/redis/init.go b/bcs-services/cluster-resources/pkg/cache/redis/init.go index 01c640908d..5809c67c04 100644 --- a/bcs-services/cluster-resources/pkg/cache/redis/init.go +++ b/bcs-services/cluster-resources/pkg/cache/redis/init.go @@ -19,16 +19,16 @@ import ( "sync" "time" - redisotel2 "github.com/go-redis/redis/extra/redisotel/v8" - "github.com/go-redis/redis/v8" - + "github.com/Tencent/bk-bcs/bcs-common/common/redisclient" "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/common/runmode" crRuntime "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/common/runtime" "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/config" log "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/logging" + "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/util/stringx" + redisotel2 "github.com/go-redis/redis/extra/redisotel/v8" ) -var rds *redis.Client +var rds redisclient.Client var initOnce sync.Once @@ -45,46 +45,61 @@ const ( minIdleConnsMultiple = 10 // idleTimeout unit: min idleTimeout = 3 + // Sentinel mode + Sentinel = "sentinel" + // Cluster mode + Cluster = "cluster" + // Single mode + Single = "single" ) -// NewStandaloneClient 创建单实例模式 RedisClient(非哨兵模式) -func NewStandaloneClient(conf *config.RedisConf) *redis.Client { - opt := &redis.Options{ - Addr: conf.Address, - Password: conf.Password, +// NewRedisClient 根据配置创建不同模式的 RedisClient +func NewRedisClient(conf *config.RedisConf) (redisclient.Client, error) { + clientConf := redisclient.Config{ + Addrs: stringx.Split(conf.Address), DB: conf.DB, + Password: conf.Password, + // 默认配置 + DialTimeout: time.Duration(dialTimeout) * time.Second, + ReadTimeout: time.Duration(readTimeout) * time.Second, + WriteTimeout: time.Duration(writeTimeout) * time.Second, + PoolSize: pollSizeMultiple * runtime.NumCPU(), + MinIdleConns: minIdleConnsMultiple * runtime.NumCPU(), + IdleTimeout: time.Duration(idleTimeout) * time.Minute, } - // 默认配置 - opt.DialTimeout = time.Duration(dialTimeout) * time.Second - opt.ReadTimeout = time.Duration(readTimeout) * time.Second - opt.WriteTimeout = time.Duration(writeTimeout) * time.Second - opt.PoolSize = pollSizeMultiple * runtime.NumCPU() - opt.MinIdleConns = minIdleConnsMultiple * runtime.NumCPU() - opt.IdleTimeout = time.Duration(idleTimeout) * time.Minute - // 若配置中指定,则使用 if conf.DialTimeout > 0 { - opt.DialTimeout = time.Duration(conf.DialTimeout) * time.Second + clientConf.DialTimeout = time.Duration(conf.DialTimeout) * time.Second } if conf.ReadTimeout > 0 { - opt.ReadTimeout = time.Duration(conf.ReadTimeout) * time.Second + clientConf.ReadTimeout = time.Duration(conf.ReadTimeout) * time.Second } if conf.WriteTimeout > 0 { - opt.WriteTimeout = time.Duration(conf.WriteTimeout) * time.Second + clientConf.WriteTimeout = time.Duration(conf.WriteTimeout) * time.Second } if conf.PoolSize > 0 { - opt.PoolSize = conf.PoolSize + clientConf.PoolSize = conf.PoolSize } if conf.MinIdleConns > 0 { - opt.MinIdleConns = conf.MinIdleConns + clientConf.MinIdleConns = conf.MinIdleConns + } + + switch conf.RedisMode { + case Sentinel: + clientConf.Mode = redisclient.SentinelMode + clientConf.MasterName = conf.MasterName + case Cluster: + clientConf.Mode = redisclient.ClusterMode + default: + clientConf.Mode = redisclient.SingleMode } log.Info(context.TODO(), - "start connect redis: %s [db=%d, dialTimeout=%s, readTimeout=%s, writeTimeout=%s, poolSize=%d, minIdleConns=%d, idleTimeout=%s]", //nolint:lll - opt.Addr, opt.DB, opt.DialTimeout, opt.ReadTimeout, opt.WriteTimeout, opt.PoolSize, opt.MinIdleConns, opt.IdleTimeout) + "start connect redis: %v [mode=%s db=%d, dialTimeout=%s, readTimeout=%s, writeTimeout=%s, poolSize=%d, minIdleConns=%d, idleTimeout=%s]", //nolint:lll + clientConf.Addrs, clientConf.DB, clientConf.Mode, clientConf.DialTimeout, clientConf.ReadTimeout, clientConf.WriteTimeout, clientConf.PoolSize, clientConf.MinIdleConns, clientConf.IdleTimeout) - return redis.NewClient(opt) + return redisclient.NewClient(clientConf) } // InitRedisClient 初始化 Redis 客户端 @@ -93,10 +108,14 @@ func InitRedisClient(conf *config.RedisConf) { return } initOnce.Do(func() { - rds = NewStandaloneClient(conf) - rds.AddHook(redisotel2.NewTracingHook()) + var err error + // 初始化失败,panic + if rds, err = NewRedisClient(conf); err != nil { + panic(err) + } + rds.GetCli().AddHook(redisotel2.NewTracingHook()) // 若 Redis 服务异常,应重置 rds 并 panic - if _, err := rds.Ping(context.TODO()).Result(); err != nil { + if _, err = rds.Ping(context.TODO()); err != nil { rds = nil panic(err) } @@ -104,12 +123,15 @@ func InitRedisClient(conf *config.RedisConf) { } // GetDefaultClient 获取默认 Redis 客户端 -func GetDefaultClient() *redis.Client { +func GetDefaultClient() redisclient.Client { if rds == nil { // 单元测试模式下,自动启用测试用 Redis,否则需要提前初始化 if crRuntime.RunMode == runmode.UnitTest { - rds = NewTestRedisClient() - rds.AddHook(redisotel2.NewTracingHook()) + var err error + if rds, err = redisclient.NewTestClient(); err != nil { + panic(err) + } + rds.GetCli().AddHook(redisotel2.NewTracingHook()) return rds } panic("prod and stag run mode need init redis!") diff --git a/bcs-services/cluster-resources/pkg/cache/redis/init_test.go b/bcs-services/cluster-resources/pkg/cache/redis/init_test.go index 5e21ffbff1..c4b0a9fc34 100644 --- a/bcs-services/cluster-resources/pkg/cache/redis/init_test.go +++ b/bcs-services/cluster-resources/pkg/cache/redis/init_test.go @@ -43,6 +43,6 @@ func TestGetDefaultClient(t *testing.T) { rdsCli := GetDefaultClient() assert.NotNil(t, rdsCli) - ret, _ := rds.Ping(context.TODO()).Result() + ret, _ := rds.Ping(context.TODO()) assert.Equal(t, "PONG", ret) } diff --git a/bcs-services/cluster-resources/pkg/cache/redis/redis.go b/bcs-services/cluster-resources/pkg/cache/redis/redis.go index 361ddd8e4a..b0a764a2d1 100644 --- a/bcs-services/cluster-resources/pkg/cache/redis/redis.go +++ b/bcs-services/cluster-resources/pkg/cache/redis/redis.go @@ -18,10 +18,9 @@ import ( "fmt" "time" - "github.com/go-redis/cache/v8" - "github.com/go-redis/redis/v8" - + "github.com/Tencent/bk-bcs/bcs-common/common/redisclient" crCache "github.com/Tencent/bk-bcs/bcs-services/cluster-resources/pkg/cache" + "github.com/go-redis/cache/v8" ) const ( @@ -31,11 +30,11 @@ const ( // Cache 缓存实例 type Cache struct { - name string // 缓存键名 - keyPrefix string // 缓存键前缀 - codec *cache.Cache // go-redis cache - cli *redis.Client // redis client - exp time.Duration // 默认过期时间 + name string // 缓存键名 + keyPrefix string // 缓存键前缀 + codec *cache.Cache // go-redis cache + cli redisclient.Client // redis client + exp time.Duration // 默认过期时间 } // NewCache 新建 cache 实例 @@ -46,7 +45,7 @@ func NewCache(name string, expiration time.Duration) *Cache { keyPrefix := fmt.Sprintf("%s:%s", CacheKeyPrefix, name) codec := cache.New(&cache.Options{ - Redis: cli, + Redis: cli.GetCli(), }) return &Cache{ @@ -80,7 +79,7 @@ func (c *Cache) Set(ctx context.Context, key crCache.Key, value interface{}, dur // Exists 检查 key 在 redis 中是否存在 func (c *Cache) Exists(ctx context.Context, key crCache.Key) bool { k := c.genKey(key.Key()) - count, err := c.cli.Exists(ctx, k).Result() + count, err := c.cli.Exists(ctx, k) return err == nil && count == 1 } @@ -93,15 +92,15 @@ func (c *Cache) Get(ctx context.Context, key crCache.Key, value interface{}) err // Delete 从 redis 中删除指定的键 func (c *Cache) Delete(ctx context.Context, key crCache.Key) error { k := c.genKey(key.Key()) - _, err := c.cli.Del(ctx, k).Result() + _, err := c.cli.Del(ctx, k) return err } // DeleteByPrefix 根据键前缀删除缓存,慎用! func (c *Cache) DeleteByPrefix(ctx context.Context, prefix string) error { - iter := c.cli.Scan(ctx, 0, c.genKey(prefix)+"*", 0).Iterator() + iter := c.cli.GetCli().Scan(ctx, 0, c.genKey(prefix)+"*", 0).Iterator() for iter.Next(ctx) { - if err := c.cli.Del(ctx, iter.Val()).Err(); err != nil { + if _, err := c.cli.Del(ctx, iter.Val()); err != nil { return err } } diff --git a/bcs-services/cluster-resources/pkg/cache/redis/testing.go b/bcs-services/cluster-resources/pkg/cache/redis/testing.go deleted file mode 100644 index 12b188bdfb..0000000000 --- a/bcs-services/cluster-resources/pkg/cache/redis/testing.go +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making Blueking Container Service available. - * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. - * Licensed under the MIT License (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * http://opensource.org/licenses/MIT - * Unless required by applicable law or agreed to in writing, software distributed under - * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ - -package redis - -import ( - "github.com/alicebob/miniredis" - "github.com/go-redis/redis/v8" -) - -// NewTestRedisClient 新建单元测试同 Redis Cli -func NewTestRedisClient() *redis.Client { - mr, err := miniredis.Run() - if err != nil { - panic(err) - } - - client := redis.NewClient(&redis.Options{ - Addr: mr.Addr(), - }) - - return client -} diff --git a/bcs-services/cluster-resources/pkg/config/config.go b/bcs-services/cluster-resources/pkg/config/config.go index dc26d7879f..d0e0f16536 100644 --- a/bcs-services/cluster-resources/pkg/config/config.go +++ b/bcs-services/cluster-resources/pkg/config/config.go @@ -270,6 +270,8 @@ type RedisConf struct { Address string `yaml:"address" usage:"Redis Server Address"` DB int `yaml:"db" usage:"Redis DB"` Password string `yaml:"password" usage:"Redis Password"` + RedisMode string `yaml:"redisMode" usage:"Redis Mode"` + MasterName string `yaml:"masterName" usage:"Redis MasterName for Sentinel Mode"` DialTimeout int `yaml:"dialTimeout" usage:"Redis Dial Timeout"` ReadTimeout int `yaml:"readTimeout" usage:"Redis Read Timeout(s)"` WriteTimeout int `yaml:"writeTimeout" usage:"Redis Write Timeout(s)"` diff --git a/bcs-services/cluster-resources/pkg/handler/basic/basic.go b/bcs-services/cluster-resources/pkg/handler/basic/basic.go index a726194b7c..53b9483cec 100644 --- a/bcs-services/cluster-resources/pkg/handler/basic/basic.go +++ b/bcs-services/cluster-resources/pkg/handler/basic/basic.go @@ -86,7 +86,7 @@ func (h *Handler) Healthz( allOK := true // 检查 redis 状态 - ret, err := redis.GetDefaultClient().Ping(ctx).Result() + ret, err := redis.GetDefaultClient().Ping(ctx) if ret != "PONG" || err != nil { resp.Redis = genHealthzStatus(false, "Redis Ping Failed") allOK = false