Skip to content

Commit

Permalink
feat: 新增拉取服务配置时更新服务拉取时间 (#3503)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ambition9186 committed Sep 23, 2024
1 parent 336ac63 commit a4c14e5
Show file tree
Hide file tree
Showing 20 changed files with 4,575 additions and 3,949 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Interface interface {
GetReleasedKvValue(kt *kit.Kit, bizID, appID, releaseID uint32, key string) (string, error)
SetClientMetric(kt *kit.Kit, bizID, appID uint32, payload []byte) error
BatchUpsertClientMetrics(kt *kit.Kit, clientData []*pbclient.Client, clientEventData []*pbce.ClientEvent) error
BatchUpdateLastConsumedTime(kt *kit.Kit, bizID uint32, appIDs []uint32) error
}

// New initialize a cache client.
Expand Down Expand Up @@ -130,3 +131,16 @@ func (c *client) BatchUpsertClientMetrics(kt *kit.Kit, clientData []*pbclient.Cl
}
return nil
}

// BatchUpdateLastConsumedTime 批量更新服务拉取时间
func (c *client) BatchUpdateLastConsumedTime(kit *kit.Kit, bizID uint32, appIDs []uint32) error {

if _, err := c.db.BatchUpdateLastConsumedTime(kit.Ctx, &pbds.BatchUpdateLastConsumedTimeReq{
BizId: bizID,
AppIds: appIDs,
}); err != nil {
return err
}

return nil
}
12 changes: 12 additions & 0 deletions bcs-services/bcs-bscp/cmd/cache-service/service/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,15 @@ func (s *Service) SetClientMetric(ctx context.Context, req *pbcs.SetClientMetric
}
return &pbcs.SetClientMetricResp{}, nil
}

// BatchUpdateLastConsumedTime 批量更新服务拉取时间
func (s *Service) BatchUpdateLastConsumedTime(ctx context.Context, req *pbcs.BatchUpdateLastConsumedTimeReq) (
*pbcs.BatchUpdateLastConsumedTimeResp, error) {

kit := kit.FromGrpcContext(ctx)
err := s.op.BatchUpdateLastConsumedTime(kit, req.GetBizId(), req.GetAppIds())
if err != nil {
return nil, err
}
return &pbcs.BatchUpdateLastConsumedTimeResp{}, nil
}
1 change: 1 addition & 0 deletions bcs-services/bcs-bscp/cmd/config-server/service/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (s *Service) Publish(ctx context.Context, req *pbcs.PublishReq) (
resp := &pbcs.PublishResp{
Id: rp.PublishedStrategyHistoryId,
HaveCredentials: rp.HaveCredentials,
HavePull: rp.HavePull,
}
return resp, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package migrations

import (
"time"

"gorm.io/gorm"

"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/cmd/data-service/db-migration/migrator"
)

func init() {
// add current migration to migrator
migrator.GetMigrator().AddMigration(&migrator.Migration{
Version: "20240909174526",
Name: "20240909174526_modify_app",
Mode: migrator.GormMode,
Up: mig20240909174526Up,
Down: mig20240909174526Down,
})
}

// mig20240909174526Up for up migration
func mig20240909174526Up(tx *gorm.DB) error {
// Applications : applications
type Applications struct {
LastConsumedTime time.Time `gorm:"column:last_consumed_time;type:datetime(6)"`
}

// Applications add new column
if !tx.Migrator().HasColumn(&Applications{}, "last_consumed_time") {
if err := tx.Migrator().AddColumn(&Applications{}, "last_consumed_time"); err != nil {
return err
}
}

return nil
}

// mig20240909174526Down for down migration
func mig20240909174526Down(tx *gorm.DB) error {

// Applications : applications
type Applications struct {
LastConsumedTime time.Time `gorm:"column:last_consumed_time;type:datetime(6)"`
}

// Applications drop column
if tx.Migrator().HasColumn(&Applications{}, "last_consumed_time") {
if err := tx.Migrator().DropColumn(&Applications{}, "last_consumed_time"); err != nil {
return err
}
}

return nil
}
13 changes: 13 additions & 0 deletions bcs-services/bcs-bscp/cmd/data-service/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,16 @@ func (s *Service) validateBizExist(kt *kit.Kit, bizID uint32) error {

return nil
}

// BatchUpdateLastConsumedTime 批量更新最后一次拉取时间
func (s *Service) BatchUpdateLastConsumedTime(ctx context.Context, req *pbds.BatchUpdateLastConsumedTimeReq) (
*pbds.BatchUpdateLastConsumedTimeResp, error) {
kit := kit.FromGrpcContext(ctx)

err := s.dao.App().BatchUpdateLastConsumedTime(kit, req.GetBizId(), req.GetAppIds())
if err != nil {
return nil, err
}

return &pbds.BatchUpdateLastConsumedTimeResp{}, nil
}
15 changes: 15 additions & 0 deletions bcs-services/bcs-bscp/cmd/data-service/service/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ func (s *Service) Publish(ctx context.Context, req *pbds.PublishReq) (*pbds.Publ
}
return nil, err
}

app, err := s.dao.App().GetByID(grpcKit, req.AppId)
if err != nil {
if rErr := tx.Rollback(); rErr != nil {
logs.Errorf("transaction rollback failed, err: %v, rid: %s", rErr, grpcKit.Rid)
}
return nil, err
}

var havePull bool
if !app.Spec.LastConsumedTime.IsZero() {
havePull = true
}

haveCredentials, err := s.checkAppHaveCredentials(grpcKit, req.BizId, req.AppId)
if err != nil {
if rErr := tx.Rollback(); rErr != nil {
Expand All @@ -126,6 +140,7 @@ func (s *Service) Publish(ctx context.Context, req *pbds.PublishReq) (*pbds.Publ
resp := &pbds.PublishResp{
PublishedStrategyHistoryId: pshID,
HaveCredentials: haveCredentials,
HavePull: havePull,
}
return resp, nil
}
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-bscp/cmd/feed-server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (fs *feedServer) listenAndServe() error {
// generate standard grpc server grpcMetrics.
grpcMetrics := grpc_prometheus.NewServerMetrics()
grpcMetrics.EnableHandlingTimeHistogram(metrics.GrpcBuckets)

recoveryOpt := grpc_recovery.WithRecoveryHandlerContext(brpc.RecoveryHandlerFuncContext)

opts := []grpc.ServerOption{grpc.MaxRecvMsgSize(1 * 1024 * 1024),
Expand All @@ -137,6 +136,7 @@ func (fs *feedServer) listenAndServe() error {
brpc.LogUnaryServerInterceptor(),
grpcMetrics.UnaryServerInterceptor(),
service.FeedUnaryAuthInterceptor,
service.FeedUnaryUpdateLastConsumedTimeInterceptor,
grpc_recovery.UnaryServerInterceptor(recoveryOpt),
),
grpc.ChainStreamInterceptor(
Expand Down
12 changes: 12 additions & 0 deletions bcs-services/bcs-bscp/cmd/feed-server/bll/lcache/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,15 @@ func (ap *App) collectHitRate() {
}
}()
}

// BatchUpdateLastConsumedTime 批量更新服务拉取时间
func (ap *App) BatchUpdateLastConsumedTime(kt *kit.Kit, bizID uint32, appIDs []uint32) error {

if _, err := ap.cs.CS().BatchUpdateLastConsumedTime(kt.Ctx, &pbcs.BatchUpdateLastConsumedTimeReq{
BizId: bizID,
AppIds: appIDs,
}); err != nil {
return err
}
return nil
}
85 changes: 85 additions & 0 deletions bcs-services/bcs-bscp/cmd/feed-server/service/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (

"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/criteria/constant"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/kit"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/logs"
pbfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/feed-server"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/runtime/jsoni"
sfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/sf-share"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/types"
)

Expand Down Expand Up @@ -120,6 +124,87 @@ func FeedUnaryAuthInterceptor(
return handler(ctx, req)
}

// FeedUnaryUpdateLastConsumedTimeInterceptor feed 更新拉取时间中间件
func FeedUnaryUpdateLastConsumedTimeInterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {

type lastConsumedTime struct {
BizID uint32
AppNames []string
AppIDs []uint32
}

param := lastConsumedTime{}

switch info.FullMethod {
case pbfs.Upstream_GetKvValue_FullMethodName:
request := req.(*pbfs.GetKvValueReq)
param.BizID = request.BizId
param.AppNames = append(param.AppNames, request.GetAppMeta().App)
case pbfs.Upstream_PullKvMeta_FullMethodName:
request := req.(*pbfs.PullKvMetaReq)
param.BizID = request.BizId
param.AppNames = append(param.AppNames, request.GetAppMeta().App)
case pbfs.Upstream_Messaging_FullMethodName:
request := req.(*pbfs.MessagingMeta)
if sfs.MessagingType(request.Type) == sfs.VersionChangeMessage {
vc := new(sfs.VersionChangePayload)
if err := vc.Decode(request.Payload); err != nil {
logs.Errorf("version change message decoding failed, %s", err.Error())
return handler(ctx, req)
}
param.BizID = vc.BasicData.BizID
param.AppNames = append(param.AppNames, vc.Application.App)
}
case pbfs.Upstream_Watch_FullMethodName:
request := req.(*pbfs.SideWatchMeta)
payload := new(sfs.SideWatchPayload)
if err := jsoni.Unmarshal(request.Payload, payload); err != nil {
logs.Errorf("parse request payload failed, %s", err.Error())
return handler(ctx, req)
}
param.BizID = payload.BizID
for _, v := range payload.Applications {
param.AppNames = append(param.AppNames, v.App)
}
case pbfs.Upstream_PullAppFileMeta_FullMethodName:
request := req.(*pbfs.PullAppFileMetaReq)
param.BizID = request.BizId
param.AppNames = append(param.AppNames, request.GetAppMeta().App)
case pbfs.Upstream_GetDownloadURL_FullMethodName:
request := req.(*pbfs.GetDownloadURLReq)
param.BizID = request.BizId
param.AppIDs = append(param.AppIDs, request.GetFileMeta().GetConfigItemAttachment().AppId)
default:
return handler(ctx, req)
}

if param.BizID != 0 {
ctx = context.WithValue(ctx, constant.BizIDKey, param.BizID) //nolint:staticcheck
svr := info.Server.(*Service)

if len(param.AppIDs) == 0 {
for _, appName := range param.AppNames {
appID, err := svr.bll.AppCache().GetAppID(kit.FromGrpcContext(ctx), param.BizID, appName)
if err != nil {
logs.Errorf("get app id failed, err: %v", err)
return handler(ctx, req)
}
param.AppIDs = append(param.AppIDs, appID)
}
}

if err := svr.bll.AppCache().BatchUpdateLastConsumedTime(kit.FromGrpcContext(ctx),
param.BizID, param.AppIDs); err != nil {
logs.Errorf("batch update app last consumed failed, err: %v", err)
return handler(ctx, req)
}
logs.Infof("batch update app last consumed time success")
}

return handler(ctx, req)
}

// wrappedStream stream 封装, 可自定义 context 传值
type wrappedStream struct {
grpc.ServerStream
Expand Down
18 changes: 18 additions & 0 deletions bcs-services/bcs-bscp/pkg/dal/dao/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package dao
import (
"errors"
"fmt"
"time"

rawgen "gorm.io/gen"

Expand Down Expand Up @@ -51,6 +52,8 @@ type App interface {
ListAppMetaForCache(kt *kit.Kit, bizID uint32, appID []uint32) (map[ /*appID*/ uint32]*types.AppCacheMeta, error)
// GetByAlias 通过Alisa 查询
GetByAlias(kit *kit.Kit, bizID uint32, alias string) (*table.App, error)
// BatchUpdateLastConsumedTime 批量更新最后一次拉取时间
BatchUpdateLastConsumedTime(kit *kit.Kit, bizID uint32, appIDs []uint32) error
}

var _ App = new(appDao)
Expand All @@ -62,6 +65,21 @@ type appDao struct {
event Event
}

// BatchUpdateLastConsumedTime 批量更新最后一次拉取时间
func (dao *appDao) BatchUpdateLastConsumedTime(kit *kit.Kit, bizID uint32, appIDs []uint32) error {

m := dao.genQ.App

_, err := dao.genQ.App.WithContext(kit.Ctx).
Where(m.BizID.Eq(bizID), m.ID.In(appIDs...)).
Update(m.LastConsumedTime, time.Now().UTC())
if err != nil {
return err
}

return nil
}

// List app's detail info with the filter's expression.
func (dao *appDao) List(kit *kit.Kit, bizList []uint32, name, operator string, opt *types.BasePage) (
[]*table.App, int64, error) {
Expand Down
30 changes: 17 additions & 13 deletions bcs-services/bcs-bscp/pkg/dal/gen/applications.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a4c14e5

Please sign in to comment.