Skip to content

Commit

Permalink
feat: 新增拉取服务配置时更新服务拉取时间以及更改路径规则
Browse files Browse the repository at this point in the history
  • Loading branch information
Ambition9186 committed Sep 10, 2024
1 parent 52028e6 commit 661d5dc
Show file tree
Hide file tree
Showing 26 changed files with 5,126 additions and 4,551 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Interface interface {
GetReleasedKvValue(kt *kit.Kit, bizID, appID, releaseID uint32, key string) (string, error)
SetClientMetric(kt *kit.Kit, bizID, appID uint32, payload []byte) error
BatchUpsertClientMetrics(kt *kit.Kit, clientData []*pbclient.Client, clientEventData []*pbce.ClientEvent) error
BatchUpdateLastConsumedTime(kt *kit.Kit, bizID uint32, appIDs []uint32) error
}

// New initialize a cache client.
Expand Down Expand Up @@ -130,3 +131,16 @@ func (c *client) BatchUpsertClientMetrics(kt *kit.Kit, clientData []*pbclient.Cl
}
return nil
}

// BatchUpdateLastConsumedTime 批量更新服务拉取时间
func (c *client) BatchUpdateLastConsumedTime(kit *kit.Kit, bizID uint32, appIDs []uint32) error {

if _, err := c.db.BatchUpdateLastConsumedTime(kit.Ctx, &pbds.BatchUpdateLastConsumedTimeReq{
BizId: bizID,
AppIds: appIDs,
}); err != nil {
return err
}

return nil
}
12 changes: 12 additions & 0 deletions bcs-services/bcs-bscp/cmd/cache-service/service/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,15 @@ func (s *Service) SetClientMetric(ctx context.Context, req *pbcs.SetClientMetric
}
return &pbcs.SetClientMetricResp{}, nil
}

// BatchUpdateLastConsumedTime 批量更新服务拉取时间
func (s *Service) BatchUpdateLastConsumedTime(ctx context.Context, req *pbcs.BatchUpdateLastConsumedTimeReq) (
*pbcs.BatchUpdateLastConsumedTimeResp, error) {

kit := kit.FromGrpcContext(ctx)
err := s.op.BatchUpdateLastConsumedTime(kit, req.GetBizId(), req.GetAppIds())
if err != nil {
return nil, err
}
return &pbcs.BatchUpdateLastConsumedTimeResp{}, nil
}
1 change: 1 addition & 0 deletions bcs-services/bcs-bscp/cmd/config-server/service/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (s *Service) Publish(ctx context.Context, req *pbcs.PublishReq) (
resp := &pbcs.PublishResp{
Id: rp.PublishedStrategyHistoryId,
HaveCredentials: rp.HaveCredentials,
HavePull: rp.HavePull,
}
return resp, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package migrations

import (
"time"

"gorm.io/gorm"

"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/cmd/data-service/db-migration/migrator"
)

func init() {
// add current migration to migrator
migrator.GetMigrator().AddMigration(&migrator.Migration{
Version: "20240909174526",
Name: "20240909174526_modify_app",
Mode: migrator.GormMode,
Up: mig20240909174526Up,
Down: mig20240909174526Down,
})
}

// mig20240909174526Up for up migration
func mig20240909174526Up(tx *gorm.DB) error {
// Applications : applications
type Applications struct {
LastConsumedTime time.Time `gorm:"column:last_consumed_time;type:datetime(6)"`
}

// Applications add new column
if !tx.Migrator().HasColumn(&Applications{}, "last_consumed_time") {
if err := tx.Migrator().AddColumn(&Applications{}, "last_consumed_time"); err != nil {
return err
}
}

return nil
}

// mig20240909174526Down for down migration
func mig20240909174526Down(tx *gorm.DB) error {

// Applications : applications
type Applications struct {
LastConsumedTime time.Time `gorm:"column:last_consumed_time;type:datetime(6)"`
}

// Applications drop column
if tx.Migrator().HasColumn(&Applications{}, "last_consumed_time") {
if err := tx.Migrator().DropColumn(&Applications{}, "last_consumed_time"); err != nil {
return err
}
}

return nil
}
13 changes: 13 additions & 0 deletions bcs-services/bcs-bscp/cmd/data-service/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,16 @@ func (s *Service) validateBizExist(kt *kit.Kit, bizID uint32) error {

return nil
}

// BatchUpdateLastConsumedTime 批量更新最后一次拉取时间
func (s *Service) BatchUpdateLastConsumedTime(ctx context.Context, req *pbds.BatchUpdateLastConsumedTimeReq) (
*pbds.BatchUpdateLastConsumedTimeResp, error) {
kit := kit.FromGrpcContext(ctx)

err := s.dao.App().BatchUpdateLastConsumedTime(kit, req.GetBizId(), req.GetAppIds())
if err != nil {
return nil, err
}

return &pbds.BatchUpdateLastConsumedTimeResp{}, nil
}
15 changes: 15 additions & 0 deletions bcs-services/bcs-bscp/cmd/data-service/service/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ func (s *Service) Publish(ctx context.Context, req *pbds.PublishReq) (*pbds.Publ
}
return nil, err
}

app, err := s.dao.App().GetByID(grpcKit, req.AppId)
if err != nil {
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, err
}

var havePull bool
if !app.Spec.LastConsumedTime.IsZero() {
havePull = true
}

haveCredentials, err := s.checkAppHaveCredentials(grpcKit, req.BizId, req.AppId)
if err != nil {
if rErr := tx.Rollback(); rErr != nil {
Expand All @@ -126,6 +140,7 @@ func (s *Service) Publish(ctx context.Context, req *pbds.PublishReq) (*pbds.Publ
resp := &pbds.PublishResp{
PublishedStrategyHistoryId: pshID,
HaveCredentials: haveCredentials,
HavePull: havePull,
}
return resp, nil
}
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-bscp/cmd/feed-server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (fs *feedServer) listenAndServe() error {
// generate standard grpc server grpcMetrics.
grpcMetrics := grpc_prometheus.NewServerMetrics()
grpcMetrics.EnableHandlingTimeHistogram(metrics.GrpcBuckets)

recoveryOpt := grpc_recovery.WithRecoveryHandlerContext(brpc.RecoveryHandlerFuncContext)

opts := []grpc.ServerOption{grpc.MaxRecvMsgSize(1 * 1024 * 1024),
Expand All @@ -137,6 +136,7 @@ func (fs *feedServer) listenAndServe() error {
brpc.LogUnaryServerInterceptor(),
grpcMetrics.UnaryServerInterceptor(),
service.FeedUnaryAuthInterceptor,
service.FeedUnaryUpdateLastConsumedTimeInterceptor,
grpc_recovery.UnaryServerInterceptor(recoveryOpt),
),
grpc.ChainStreamInterceptor(
Expand Down
12 changes: 12 additions & 0 deletions bcs-services/bcs-bscp/cmd/feed-server/bll/lcache/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,15 @@ func (ap *App) collectHitRate() {
}
}()
}

// BatchUpdateLastConsumedTime 批量更新服务拉取时间
func (ap *App) BatchUpdateLastConsumedTime(kt *kit.Kit, bizID uint32, appIDs []uint32) error {

if _, err := ap.cs.CS().BatchUpdateLastConsumedTime(kt.Ctx, &pbcs.BatchUpdateLastConsumedTimeReq{
BizId: bizID,
AppIds: appIDs,
}); err != nil {
return err
}
return nil
}
85 changes: 85 additions & 0 deletions bcs-services/bcs-bscp/cmd/feed-server/service/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (

"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/criteria/constant"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/kit"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/logs"
pbfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/feed-server"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/runtime/jsoni"
sfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/sf-share"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/types"
)

Expand Down Expand Up @@ -120,6 +124,87 @@ func FeedUnaryAuthInterceptor(
return handler(ctx, req)
}

// FeedUnaryUpdateLastConsumedTimeInterceptor feed 更新拉取时间中间件
func FeedUnaryUpdateLastConsumedTimeInterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {

type lastConsumedTime struct {
BizID uint32
AppNames []string
AppIDs []uint32
}

param := lastConsumedTime{}

switch info.FullMethod {
case "/pbfs.Upstream/GetKvValue":
request := req.(*pbfs.GetKvValueReq)
param.BizID = request.BizId
param.AppNames = append(param.AppNames, request.GetAppMeta().App)
case "/pbfs.Upstream/PullKvMeta":
request := req.(*pbfs.PullKvMetaReq)
param.BizID = request.BizId
param.AppNames = append(param.AppNames, request.GetAppMeta().App)
case "/pbfs.Upstream/Messaging":
request := req.(*pbfs.MessagingMeta)
if sfs.MessagingType(request.Type) == sfs.VersionChangeMessage {
vc := new(sfs.VersionChangePayload)
if err := vc.Decode(request.Payload); err != nil {
logs.Errorf("version change message decoding failed, %s", err.Error())
return handler(ctx, req)
}
param.BizID = vc.BasicData.BizID
param.AppNames = append(param.AppNames, vc.Application.App)
}
case "/pbfs.Upstream/Watch":
request := req.(*pbfs.SideWatchMeta)
payload := new(sfs.SideWatchPayload)
if err := jsoni.Unmarshal(request.Payload, payload); err != nil {
logs.Errorf("parse request payload failed, %s", err.Error())
return handler(ctx, req)
}
param.BizID = payload.BizID
for _, v := range payload.Applications {
param.AppNames = append(param.AppNames, v.App)
}
case "/pbfs.Upstream/PullAppFileMeta":
request := req.(*pbfs.PullAppFileMetaReq)
param.BizID = request.BizId
param.AppNames = append(param.AppNames, request.GetAppMeta().App)
case "/pbfs.Upstream/GetDownloadURL":
request := req.(*pbfs.GetDownloadURLReq)
param.BizID = request.BizId
param.AppIDs = append(param.AppIDs, request.GetFileMeta().GetConfigItemAttachment().AppId)
default:
return handler(ctx, req)
}

if param.BizID != 0 {
ctx = context.WithValue(ctx, constant.BizIDKey, param.BizID) //nolint:staticcheck
svr := info.Server.(*Service)

if len(param.AppIDs) == 0 {
for _, appName := range param.AppNames {
appID, err := svr.bll.AppCache().GetAppID(kit.FromGrpcContext(ctx), param.BizID, appName)
if err != nil {
logs.Errorf("get app id failed, err: %v", err)
return handler(ctx, req)
}
param.AppIDs = append(param.AppIDs, appID)
}
}

if err := svr.bll.AppCache().BatchUpdateLastConsumedTime(kit.FromGrpcContext(ctx),
param.BizID, param.AppIDs); err != nil {
logs.Errorf("batch update app last consumed failed, err: %v", err)
return handler(ctx, req)
}
logs.Infof("batch update app last consumed time success")
}

return handler(ctx, req)
}

// wrappedStream stream 封装, 可自定义 context 传值
type wrappedStream struct {
grpc.ServerStream
Expand Down
38 changes: 6 additions & 32 deletions bcs-services/bcs-bscp/pkg/criteria/validator/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"path/filepath"
"regexp"
"strings"

"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/criteria/constant"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/criteria/errf"
Expand All @@ -26,8 +25,8 @@ import (
)

// validUnixFileSubPathRegexp sub path support character:
// chinese, english, number, '-', '_', '#', '%', ',', '@', '^', '+', '=', '[', ']', '{', '}, '.'
var validUnixFileSubPathRegexp = regexp.MustCompile("^[\u4e00-\u9fa5A-Za-z0-9-_#%,.@^+=\\[\\]{}]+$")
// 必须以 / 开头,且不能出现连续的 /
var validUnixFileSubPathRegexp = regexp.MustCompile(`^\/([^\/])+$`)

// ValidateUnixFilePath validate unix os file path.
func ValidateUnixFilePath(kit *kit.Kit, path string) error {
Expand All @@ -39,35 +38,10 @@ func ValidateUnixFilePath(kit *kit.Kit, path string) error {
return errf.Errorf(errf.InvalidArgument, i18n.T(kit, "invalid path, length should <= 1024"))
}

// 1. should start with '/'
if !strings.HasPrefix(path, "/") {
return errf.Errorf(errf.InvalidArgument, i18n.T(kit, "invalid path, should start with '/'"))
}

// Split the path into parts
parts := strings.Split(path, "/")[1:] // Ignore the first empty part due to the leading '/'

if strings.HasSuffix(path, "/") {
parts = parts[:len(parts)-1] // Ignore the last empty part due to the trailing '/'
}

// Iterate over each part to validate
for _, part := range parts {

// 2. the verification path cannot all be '{'. '}'
if dotsRegexp.MatchString(part) {
return errf.Errorf(errf.InvalidArgument, i18n.T(kit, "invalid path %s, path cannot all be '.' ", part))
}

// 3. each sub path support character:
// chinese, english, number, '-', '_', '#', '%', ',', '@', '^', '+', '=', '[', ']', '{', '}'
if !validUnixFileSubPathRegexp.MatchString(part) {
return errf.Errorf(errf.InvalidArgument, i18n.T(kit, fmt.Sprintf(`invalid path, each sub path should only
contain chinese, english, number, '-', '_', '#', '%%', ',', '@', '^', '+', '=', '[', ']', '{', '}', '{'. '}`)))
}

// 4. each sub path should be separated by '/'
// (handled by strings.Split above)
// 必须以 / 开头,且不能出现连续的 /
if !validUnixFileSubPathRegexp.MatchString(path) {
return errf.Errorf(errf.InvalidArgument, i18n.T(kit, "invalid path %s, the path must start"+
"with '/' and cannot have consecutive '/'", path))
}

return nil
Expand Down
8 changes: 3 additions & 5 deletions bcs-services/bcs-bscp/pkg/criteria/validator/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ func ValidateReleaseName(kit *kit.Kit, name string) error {
}

// qualifiedFileNameRegexp file name regexp.
// support character: chinese, english, number,
// '-', '_', '#', '%', ',', '@', '^', '+', '=', '[', ']', '{', '}, '.'
var qualifiedFileNameRegexp = regexp.MustCompile("^[\u4e00-\u9fa5A-Za-z0-9-_#%,.@^+=\\[\\]\\{\\}]+$")
// 文件名不能包含/
var qualifiedFileNameRegexp = regexp.MustCompile("^[^/]+$")

var dotsRegexp = regexp.MustCompile(`^\.+$`)

Expand All @@ -202,8 +201,7 @@ func ValidateFileName(kit *kit.Kit, name string) error {
}

if !qualifiedFileNameRegexp.MatchString(name) {
return errf.Errorf(errf.InvalidArgument, i18n.T(kit, fmt.Sprintf(`invalid name %s, should only contains chinese,
english, number, '-', '_', '#', '%%', ',', '@', '^', '+', '=', '[', ']', '{', '}', '.'`, name)))
return errf.Errorf(errf.InvalidArgument, i18n.T(kit, "invalid name %s, there can't be any /", name))
}

return nil
Expand Down
Loading

0 comments on commit 661d5dc

Please sign in to comment.