Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DescribeTags retry on high-frequency and delay refresh interval #1344

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions extension/entitystore/serviceprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"

Expand All @@ -39,8 +40,13 @@ const (
ServiceNameSourceUnknown = "Unknown"
ServiceNameSourceUserConfiguration = "UserConfiguration"

jitterMax = 180
jitterMin = 60
describeTagsJitterMax = 3600
describeTagsJitterMin = 3000
zhihonl marked this conversation as resolved.
Show resolved Hide resolved
defaultJitterMin = 60
defaultJitterMax = 180
maxRetry = 3
infRetry = -1
RequestLimitExceeded = "RequestLimitExceeded"
zhihonl marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down Expand Up @@ -86,10 +92,10 @@ type serviceprovider struct {
func (s *serviceprovider) startServiceProvider() {
err := s.getEC2Client()
if err != nil {
go refreshLoop(s.done, s.getEC2Client, true)
go refreshLoop(s.done, s.getEC2Client, true, "", defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry)
}
go refreshLoop(s.done, s.getIAMRole, false)
go refreshLoop(s.done, s.getEC2TagServiceName, false)
go refreshLoop(s.done, s.getIAMRole, false, "", defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry)
go refreshLoop(s.done, s.getEC2TagServiceName, false, RequestLimitExceeded, describeTagsJitterMin, describeTagsJitterMax, ec2tagger.ThrottleBackOffArray, maxRetry)
}

// addEntryForLogFile adds an association between a log file glob and a service attribute, as configured in the
Expand Down Expand Up @@ -242,7 +248,7 @@ func (s *serviceprovider) getEC2TagServiceName() error {
}
result, err := s.ec2API.DescribeTags(input)
if err != nil {
continue
return err
zhihonl marked this conversation as resolved.
Show resolved Hide resolved
}
for _, tag := range result.Tags {
key := *tag.Key
Expand Down Expand Up @@ -307,23 +313,28 @@ func newServiceProvider(mode string, region string, ec2Info *ec2Info, metadataPr
}
}

func refreshLoop(done chan struct{}, updateFunc func() error, oneTime bool) {
func refreshLoop(done chan struct{}, updateFunc func() error, oneTime bool, retryOnError string, successRetryMin int, successRetryMax int, backoffArray []time.Duration, maxRetry int) int {
zhihonl marked this conversation as resolved.
Show resolved Hide resolved
// Offset retry by 1 so we can start with 1 minute wait time
// instead of immediately retrying
retry := 1
for {
if maxRetry != -1 && retry > maxRetry {
return retry
}
err := updateFunc()
if err == nil && oneTime {
return
return retry
} else if awsErr, ok := err.(awserr.Error); ok && retryOnError != "" && awsErr.Code() != retryOnError {
return retry
}

waitDuration := calculateWaitTime(retry, err)
waitDuration := calculateWaitTime(retry-1, err, successRetryMin, successRetryMax, backoffArray)
wait := time.NewTimer(waitDuration)
select {
case <-done:
log.Printf("D! serviceprovider: Shutting down now")
wait.Stop()
return
return retry
case <-wait.C:
}

Expand All @@ -339,20 +350,21 @@ func refreshLoop(done chan struct{}, updateFunc func() error, oneTime bool) {
}

}
return retry
}

// calculateWaitTime returns different time based on whether if
// a function call was returned with error. If returned with error,
// follow exponential backoff wait time, otherwise, refresh with jitter
func calculateWaitTime(retry int, err error) time.Duration {
func calculateWaitTime(retry int, err error, successRetryMin int, successRetryMax int, backoffArray []time.Duration) time.Duration {
var waitDuration time.Duration
if err == nil {
return time.Duration(rand.Intn(jitterMax-jitterMin)+jitterMin) * time.Second
return time.Duration(rand.Intn(successRetryMax-successRetryMin)+successRetryMin) * time.Second
}
if retry < len(ec2tagger.BackoffSleepArray) {
waitDuration = ec2tagger.BackoffSleepArray[retry]
if retry < len(backoffArray) {
waitDuration = backoffArray[retry]
} else {
waitDuration = ec2tagger.BackoffSleepArray[len(ec2tagger.BackoffSleepArray)-1]
waitDuration = backoffArray[len(backoffArray)-1]
}
return waitDuration
}
75 changes: 71 additions & 4 deletions extension/entitystore/serviceprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,40 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/stretchr/testify/assert"

configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
)

type mockServiceNameEC2Client struct {
ec2iface.EC2API
throttleError bool
authError bool
}

// construct the return results for the mocked DescribeTags api
var (
tagKeyService = "service"
tagValService = "test-service"
tagDesService = ec2.TagDescription{Key: &tagKeyService, Value: &tagValService}

FastBackOffArray = []time.Duration{0, 0, 0}
)

func (m *mockServiceNameEC2Client) DescribeTags(*ec2.DescribeTagsInput) (*ec2.DescribeTagsOutput, error) {
if m.throttleError {
return nil, awserr.New(RequestLimitExceeded, "throttle limit exceeded", nil)
}
if m.authError {
return nil, awserr.New("UnauthorizedOperation", "UnauthorizedOperation occurred", nil)
}
testTags := ec2.DescribeTagsOutput{
NextToken: nil,
Tags: []*ec2.TagDescription{&tagDesService},
Expand Down Expand Up @@ -365,7 +377,6 @@ func Test_refreshLoop(t *testing.T) {
ec2API ec2iface.EC2API
iamRole string
ec2TagServiceName string
refreshInterval time.Duration
oneTime bool
}
type expectedInfo struct {
Expand All @@ -387,7 +398,6 @@ func Test_refreshLoop(t *testing.T) {
ec2API: &mockServiceNameEC2Client{},
iamRole: "original-role",
ec2TagServiceName: "original-tag-name",
refreshInterval: time.Millisecond,
},
expectedInfo: expectedInfo{
iamRole: "TestRole",
Expand All @@ -408,12 +418,69 @@ func Test_refreshLoop(t *testing.T) {
ec2TagServiceName: tt.fields.ec2TagServiceName,
done: done,
}
go refreshLoop(done, s.getEC2TagServiceName, tt.fields.oneTime)
go refreshLoop(done, s.getIAMRole, tt.fields.oneTime)
go refreshLoop(done, s.getEC2TagServiceName, tt.fields.oneTime, RequestLimitExceeded, describeTagsJitterMin, describeTagsJitterMax, ec2tagger.ThrottleBackOffArray, maxRetry)
go refreshLoop(done, s.getIAMRole, tt.fields.oneTime, "", defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry)
time.Sleep(time.Second)
close(done)
assert.Equal(t, tt.expectedInfo.iamRole, s.iamRole)
assert.Equal(t, tt.expectedInfo.ec2TagServiceName, s.ec2TagServiceName)
})
}
}

func Test_refreshLoopRetry(t *testing.T) {
type fields struct {
metadataProvider ec2metadataprovider.MetadataProvider
ec2API ec2iface.EC2API
oneTime bool
}
tests := []struct {
name string
fields fields
expectedRetry int
}{
{
name: "ThrottleLimitError",
fields: fields{
metadataProvider: &mockMetadataProvider{
InstanceIdentityDocument: &ec2metadata.EC2InstanceIdentityDocument{
InstanceID: "i-123456789"},
},
ec2API: &mockServiceNameEC2Client{
throttleError: true,
},
},
expectedRetry: 4,
},
{
name: "AuthError",
fields: fields{
metadataProvider: &mockMetadataProvider{
InstanceIdentityDocument: &ec2metadata.EC2InstanceIdentityDocument{
InstanceID: "i-123456789"},
},
ec2API: &mockServiceNameEC2Client{
authError: true,
},
},
expectedRetry: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
done := make(chan struct{})
s := &serviceprovider{
metadataProvider: tt.fields.metadataProvider,
ec2API: tt.fields.ec2API,
ec2Provider: func(s string, config *configaws.CredentialConfig) ec2iface.EC2API {
return tt.fields.ec2API
},
done: done,
}
retry := refreshLoop(done, s.getEC2TagServiceName, tt.fields.oneTime, RequestLimitExceeded, describeTagsJitterMin, describeTagsJitterMax, FastBackOffArray, maxRetry)
time.Sleep(time.Second)
close(done)
assert.Equal(t, tt.expectedRetry, retry)
})
}
}
1 change: 1 addition & 0 deletions plugins/processors/ec2tagger/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ const (
var (
// issue with newer versions of the sdk take longer when hop limit is 1 in eks
defaultRefreshInterval = 180 * time.Second
ThrottleBackOffArray = []time.Duration{0, 1 * time.Minute, 3 * time.Minute} // backoff retry for ec2 describe instances API call. Assuming the throttle limit is 20 per second. 10 mins allow 12000 API calls.
BackoffSleepArray = []time.Duration{0, 1 * time.Minute, 1 * time.Minute, 3 * time.Minute, 3 * time.Minute, 3 * time.Minute, 10 * time.Minute} // backoff retry for ec2 describe instances API call. Assuming the throttle limit is 20 per second. 10 mins allow 12000 API calls.
)
Loading