Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/TencentBlueKing/bk-bcs in…
Browse files Browse the repository at this point in the history
…to example-0719
  • Loading branch information
q15971095971 committed Jul 20, 2024
2 parents e4df0e9 + c85792e commit 0163e92
Show file tree
Hide file tree
Showing 30 changed files with 818 additions and 257 deletions.
28 changes: 28 additions & 0 deletions bcs-services/bcs-bscp/cmd/feed-server/etc/feed_server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,34 @@ downstream:
burst: 500
waitTimeMil: 50

# feed server's rate limiter related settings.
rateLimiter:
# 是否启用限流器,默认为false(关闭)
enable:
# clientBandwidth为单个客户端下载文件所评估的可用带宽,用于决定单次下载流控阈值,单位为MB/s,默认为50(50MB/s=400Mb/s)
clientBandwidth:
# global为全局限流器配置,整个feed-server粒度
global:
# limit为流量速率限制,单位为MB/s,默认为1000(1000MB/s=8000Mb/s)
limit:
# burst为允许处理的突发流量上限(允许系统在短时间内处理比速率限制更多的流量),单位为MB,默认为2000(2000MB=16000Mb)
burst:
# biz为业务粒度限流器配置
biz:
# 业务默认配置(优先级低于显示设置的业务配置)
default:
# limit为流量速率限制,单位为MB/s,默认为100(100MB/s=800Mb/s)
limit:
# burst为允许处理的突发流量上限(允许系统在短时间内处理比速率限制更多的流量),单位为MB,默认为200(200MB=1600Mb)
burst:
# 显示设置的业务配置
spec:
2:
# limit为流量速率限制,单位为MB/s
limit:
# burst为允许处理的突发流量上限(允许系统在短时间内处理比速率限制更多的流量),单位为MB
burst:

# feed server's local cache related settings.
# Note:
# 1. These configurations depend on you host's in-memory cache size, the larger the value of these
Expand Down
37 changes: 37 additions & 0 deletions bcs-services/bcs-bscp/cmd/feed-server/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,31 @@ func initMetric(name string) *metric {
}, []string{"bizID", "appName"})
metrics.Register().MustRegister(m.clientCurrentMemUsage)

m.downloadTotalSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.FSConfigConsume,
Name: "file_download_total_size_bytes",
Help: "Total size of files downloaded, biz 0 means global",
ConstLabels: labels,
}, []string{"bizID"})
metrics.Register().MustRegister(m.downloadTotalSize)
m.downloadDelayRequests = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.FSConfigConsume,
Name: "file_download_delay_requests",
Help: "Total number of downloaded file delayed requests, biz 0 means global",
ConstLabels: labels,
}, []string{"bizID"})
metrics.Register().MustRegister(m.downloadDelayRequests)
m.downloadDelayMilliseconds = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.FSConfigConsume,
Name: "file_download_delay_milliseconds",
Help: "Delay milliseconds of downloaded file, biz 0 means global",
ConstLabels: labels,
}, []string{"bizID"})
metrics.Register().MustRegister(m.downloadDelayMilliseconds)

return m
}

Expand All @@ -84,6 +109,7 @@ type metric struct {
// watchCounter record the total connection count of sidecars with watch, used to get the new the connection count
// within a specified time range.
watchCounter *prometheus.CounterVec

// clientMaxCPUUsage The maximum cpu usage of the client was collected
clientMaxCPUUsage *prometheus.GaugeVec
// clientMaxMemUsage the maximum memory usage was collected
Expand All @@ -92,4 +118,15 @@ type metric struct {
clientCurrentCPUUsage *prometheus.GaugeVec
// clientCurrentMemUsage the current memory usage of the client is collected
clientCurrentMemUsage *prometheus.GaugeVec

downloadTotalSize *prometheus.GaugeVec
downloadDelayRequests *prometheus.GaugeVec
downloadDelayMilliseconds *prometheus.GaugeVec
}

// collectDownload collects metrics for download
func (m *metric) collectDownload(biz string, totalSize, delayRequests, delayMilliseconds int64) {
m.downloadTotalSize.With(prm.Labels{"bizID": biz}).Set(float64(totalSize))
m.downloadDelayRequests.With(prm.Labels{"bizID": biz}).Set(float64(delayRequests))
m.downloadDelayMilliseconds.With(prm.Labels{"bizID": biz}).Set(float64(delayMilliseconds))
}
33 changes: 32 additions & 1 deletion bcs-services/bcs-bscp/cmd/feed-server/service/rpc_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
pbbase "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/core/base"
pbkv "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/core/kv"
pbfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/feed-server"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/ratelimiter"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/runtime/jsoni"
sfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/sf-share"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/tools"
Expand Down Expand Up @@ -456,7 +457,37 @@ func (s *Service) GetDownloadURL(ctx context.Context, req *pbfs.GetDownloadURLRe
return nil, status.Errorf(codes.Aborted, "generate temp download url failed, %s", err.Error())
}

return &pbfs.GetDownloadURLResp{Url: downloadLink}, nil
if !s.rl.Enable() {
return &pbfs.GetDownloadURLResp{Url: downloadLink, WaitTimeMil: 0}, nil
}
// 对于单个大文件下载,受限于单个客户端和服务端之间的带宽(比如为10MB/s=80Mb/s),而在存储服务端支持更高带宽的情况下(比如100MB/s),
// 哪怕单个文件2GB,在限流器阈值比单个客户端下载带宽高的情况下,还是应该允许其他客户端去存储服务端下载,
// 超过客户端下载带宽的大文件暂且按客户端带宽计入流控
// 多个大文件的持续下载,会影响到限流器流控的精确性,当前暂时只计入每个大文件首次一秒内的流控情况,后续的流量消耗不计入
var gWaitTimeMil, bWaitTimeMil int64
bandwidth := int(s.rl.ClientBandwidth()) * ratelimiter.MB
if int(req.FileMeta.CommitSpec.Content.ByteSize) > bandwidth {
gWaitTimeMil = s.rl.Global().WaitTimeMil(bandwidth)
bWaitTimeMil = s.rl.UseBiz(uint(req.BizId)).WaitTimeMil(bandwidth)
} else {
gWaitTimeMil = s.rl.Global().WaitTimeMil(int(req.FileMeta.CommitSpec.Content.ByteSize))
bWaitTimeMil = s.rl.UseBiz(uint(req.BizId)).WaitTimeMil(int(req.FileMeta.CommitSpec.Content.ByteSize))
}

// 分别统计全局和业务粒度流控情况
gs := s.rl.Global().Stats()
s.mc.collectDownload("0", gs.TotalByteSize, gs.DelayCnt, gs.DelayMilliseconds)
bs := s.rl.UseBiz(uint(req.BizId)).Stats()
logs.V(1).Infof("biz: %d, download file total byte size:%d, delay cnt:%d, delay milliseconds:%d",
req.BizId, bs.TotalByteSize, bs.DelayCnt, bs.DelayMilliseconds)
s.mc.collectDownload(fmt.Sprintf("%d", req.BizId), bs.TotalByteSize, bs.DelayCnt, bs.DelayMilliseconds)

// 优先使用流控时间长的
wt := bWaitTimeMil
if bWaitTimeMil < gWaitTimeMil {
wt = gWaitTimeMil
}
return &pbfs.GetDownloadURLResp{Url: downloadLink, WaitTimeMil: wt}, nil
}

// PullKvMeta pull an app's latest release metadata only when the app's configures is kv type.
Expand Down
6 changes: 6 additions & 0 deletions bcs-services/bcs-bscp/cmd/feed-server/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/dal/repository"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/iam/auth"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/logs"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/ratelimiter"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/rest"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/runtime/handler"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/runtime/shutdown"
Expand All @@ -51,6 +52,7 @@ type Service struct {
name string
mc *metric
gwMux *runtime.ServeMux
rl *ratelimiter.RL
}

// NewService create a service instance.
Expand Down Expand Up @@ -81,6 +83,9 @@ func NewService(sd serviced.Discover, name string) (*Service, error) {
return nil, fmt.Errorf("new repository provider failed, err: %v", err)
}

rl := ratelimiter.New(cc.FeedServer().RateLimiter)
logs.Infof("init rate limiter, conf: %+v", cc.FeedServer().RateLimiter)

return &Service{
bll: bl,
authorizer: authorizer,
Expand All @@ -89,6 +94,7 @@ func NewService(sd serviced.Discover, name string) (*Service, error) {
provider: provider,
mc: initMetric(name),
gwMux: gwMux,
rl: rl,
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions bcs-services/bcs-bscp/pkg/cc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ type FeedServerSetting struct {
FSLocalCache FSLocalCache `yaml:"fsLocalCache"`
Downstream Downstream `yaml:"downstream"`
MRLimiter MatchReleaseLimiter `yaml:"matchReleaseLimiter"`
RateLimiter RateLimiter `yaml:"rateLimiter"`
}

// trySetFlagBindIP try set flag bind ip.
Expand All @@ -394,6 +395,7 @@ func (s *FeedServerSetting) trySetDefault() {
s.GSE.trySetDefault()
s.RedisCluster.trySetDefault()
s.MRLimiter.trySetDefault()
s.RateLimiter.trySetDefault()
}

// Validate FeedServerSetting option.
Expand Down Expand Up @@ -423,6 +425,10 @@ func (s FeedServerSetting) Validate() error {
return err
}

if err := s.RateLimiter.validate(); err != nil {
return err
}

if err := s.Esb.validate(); err != nil {
return err
}
Expand Down
98 changes: 98 additions & 0 deletions bcs-services/bcs-bscp/pkg/cc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,104 @@ func (lm *MatchReleaseLimiter) trySetDefault() {
}
}

// RateLimiter defines the rate limiter options for traffic control.
type RateLimiter struct {
Enable bool `yaml:"enable"`
ClientBandwidth uint `yaml:"clientBandwidth"`
Global BasicRL `yaml:"global"`
Biz BizRLs `yaml:"biz"`
}

// BizRLs defines the rate limiters for biz
type BizRLs struct {
Default BasicRL `yaml:"default"`
Spec map[uint]BasicRL `yaml:"spec"`
}

// BasicRL defines the basic options for rate limiter.
type BasicRL struct {
Limit uint `yaml:"limit"`
Burst uint `yaml:"burst"`
}

const (
// DefaultClientBandwidth default client bandwidth
DefaultClientBandwidth = 50 // 50MB/s = 400Mb/s
// DefaultGlobalRateLimit default global rate limit
DefaultGlobalRateLimit = 1000 // 1000MB/s = 8000Mb/s
// DefaultGlobalRateBurst default global rate burst
DefaultGlobalRateBurst = 2000 // 2000MB = 16000Mb
// DefaultBizRateLimit default biz rate limit
DefaultBizRateLimit = 100 // 100MB/s = 800Mb/s
// DefaultBizRateBurst default biz rate burst
DefaultBizRateBurst = 200 // 200MB = 1600Mb
)

// validate if the rate limiter is valid or not.
func (rl RateLimiter) validate() error {
if rl.Biz.Default.Burst < rl.Biz.Default.Limit {
return fmt.Errorf("invalid rateLimiter.biz.default.burst value %d, should >= rateLimiter.biz.default.limit "+
"value %d", rl.Global.Burst, rl.Global.Limit)
}

if rl.Global.Limit < rl.Biz.Default.Limit {
return fmt.Errorf("invalid rateLimiter.global.limit value %d, should >= rateLimiter.biz.default.limit value %d",
rl.Global.Limit, rl.ClientBandwidth)
}

if rl.Global.Burst < rl.Biz.Default.Burst {
return fmt.Errorf("invalid rateLimiter.global.burst value %d, should >= rateLimiter.biz.default.burst value %d",
rl.Global.Burst, rl.Global.Limit)
}

for bizID, l := range rl.Biz.Spec {
if l.Burst < l.Limit {
return fmt.Errorf("invalid rateLimiter.biz.spec.%d.burst value %d, "+
"should >= rateLimiter.biz.spec.%d.limit value %d", bizID, l.Burst, bizID, l.Limit)
}
}

return nil
}

// trySetDefault try set the default value of rate limiter
func (rl *RateLimiter) trySetDefault() {
if rl.ClientBandwidth == 0 {
rl.ClientBandwidth = DefaultClientBandwidth
}

if rl.Global.Limit == 0 {
rl.Global.Limit = DefaultGlobalRateLimit
}

if rl.Global.Burst == 0 {
rl.Global.Burst = DefaultGlobalRateBurst
}

if rl.Biz.Default.Limit == 0 {
rl.Biz.Default.Limit = DefaultBizRateLimit
}

if rl.Biz.Default.Burst == 0 {
rl.Biz.Default.Burst = DefaultBizRateBurst
}

for bizID, l := range rl.Biz.Spec {
if l.Limit == 0 {
rl.Biz.Spec[bizID] = BasicRL{
Limit: DefaultBizRateLimit,
Burst: l.Burst,
}
}
if l.Burst == 0 {
rl.Biz.Spec[bizID] = BasicRL{
Limit: rl.Biz.Spec[bizID].Limit,
Burst: DefaultBizRateBurst,
}
}
}
}

// Credential credential encryption algorithm and master key
type Credential struct {
MasterKey string `yaml:"master_key"`
Expand Down
Loading

0 comments on commit 0163e92

Please sign in to comment.