Skip to content

Commit

Permalink
Fix/signature not match (#187)
Browse files Browse the repository at this point in the history
* retry when get SignatureDoesNotMatch

* add log level

* using multipart upload when size of file too large

* return error

---------

Co-authored-by: sapd <sapd@vccloud.vn>
  • Loading branch information
greatbn and sapd authored Sep 24, 2023
1 parent 599f0ce commit 7d892e7
Showing 1 changed file with 259 additions and 17 deletions.
276 changes: 259 additions & 17 deletions pkg/storage_vault/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package s3

import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
Expand Down Expand Up @@ -54,6 +55,8 @@ func (s3 *S3) ID() (string, string) {
var _ storage_vault.StorageVault = (*S3)(nil)
var uploadKb, downloadKb int

var maxPartSize = int64(50 * 1024 * 1024)

func NewS3Default(vault backupapi.StorageVault, actionID string, limitUpload, limitDownload int, backupClient *backupapi.Client) (*S3, error) {
uploadKb, downloadKb = limitUpload, limitDownload

Expand Down Expand Up @@ -109,6 +112,7 @@ func NewS3Default(vault backupapi.StorageVault, actionID string, limitUpload, li
Endpoint: aws.String(vault.Credential.AwsLocation),
Region: aws.String(vault.Credential.Region),
S3ForcePathStyle: aws.Bool(true),
LogLevel: aws.LogLevel(aws.LogDebug),
HTTPClient: &http.Client{Transport: rt},
})))
s3.S3Session = sess
Expand Down Expand Up @@ -149,7 +153,7 @@ func (s3 *S3) VerifyObject(key string) (bool, bool, string, error) {
break
}
s3.logger.Sugar().Errorf("VerifyObject error: %s %s", aerr.Code(), aerr.Message())
if (aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden") && s3.Type().CredentialType == "DEFAULT" {
if (aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" || aerr.Code() == "SignatureDoesNotMatch" ) && s3.Type().CredentialType == "DEFAULT" {
s3.logger.Sugar().Info("GetCredential in head object ", key)
storageVaultID, actID := s3.ID()
vault, err := s3.backupClient.GetCredentialStorageVault(storageVaultID, actID, nil)
Expand Down Expand Up @@ -187,32 +191,45 @@ func (s3 *S3) PutObject(key string, data []byte) error {
isExist, integrity, _, _ := s3.VerifyObject(key)
if isExist {
if !integrity {
_, err = s3.S3Session.PutObject(&storage.PutObjectInput{
Bucket: aws.String(s3.StorageBucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
if int64(len(data)) > maxPartSize {
err = s3.putObjectMultiPart(key, data)
} else {
_, err = s3.S3Session.PutObject(&storage.PutObjectInput{
Bucket: aws.String(s3.StorageBucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
}

if err == nil {
break
}
} else {
break
}
} else {
_, err = s3.S3Session.PutObject(&storage.PutObjectInput{
Bucket: aws.String(s3.StorageBucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
if int64(len(data)) > maxPartSize {
err = s3.putObjectMultiPart(key, data)
} else {
_, err = s3.S3Session.PutObject(&storage.PutObjectInput{
Bucket: aws.String(s3.StorageBucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
}
if !strings.Contains(key, "chunk.json") && !strings.Contains(key, "index.json") && !strings.Contains(key, "file.csv") {
isExist, integrity, _, _ = s3.VerifyObject(key)
if isExist {
if !integrity {
_, err = s3.S3Session.PutObject(&storage.PutObjectInput{
Bucket: aws.String(s3.StorageBucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
if int64(len(data)) > maxPartSize {
err = s3.putObjectMultiPart(key, data)
} else {
_, err = s3.S3Session.PutObject(&storage.PutObjectInput{
Bucket: aws.String(s3.StorageBucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
}
if err == nil {
break
}
Expand All @@ -227,7 +244,7 @@ func (s3 *S3) PutObject(key string, data []byte) error {
}
if aerr, ok := err.(awserr.Error); ok {
s3.logger.Sugar().Errorf("PutObject error: %s %s", aerr.Code(), aerr.Message())
if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" {
if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" || aerr.Code() == "SignatureDoesNotMatch" {
if once {
s3.logger.Error("Return false cause in put object: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", key))
return err
Expand All @@ -252,6 +269,47 @@ func (s3 *S3) PutObject(key string, data []byte) error {
return err
}


func (s3 *S3) putObjectMultiPart(key string, data []byte) (error) {
respMPU, err := s3.createMultiPartUpload(key)
if err != nil {
return err
}
var curr, partLength int64
var remaining = int64(len(data))
var completedParts []*storage.CompletedPart
partNumber := 1
for curr = 0; remaining != 0; curr += partLength {
if remaining < maxPartSize {
partLength = remaining
} else {
partLength = maxPartSize
}
completedPart, err := s3.uploadPart(respMPU, data[curr:curr+partLength], partNumber)
if err != nil {
fmt.Println(err.Error())
err := s3.abortMultiPartUpload(respMPU)
if err != nil {
s3.logger.Sugar().Error(err.Error())
return err
}
return nil
}
remaining -= partLength
partNumber++
completedParts = append(completedParts, completedPart)
}

completeResponse, err := s3.completeMultiPartUpload(respMPU, completedParts)
if err != nil {
s3.logger.Sugar().Error(err.Error())
return err
}

s3.logger.Sugar().Info("Successfully uploaded file: %s\n", completeResponse.String())
return nil
}

func (s3 *S3) GetObject(key string) ([]byte, error) {
var err error
var once bool
Expand Down Expand Up @@ -350,6 +408,190 @@ func (s3 *S3) HeadObject(key string) (bool, string, error) {
return false, "", err
}


func (s3 *S3) createMultiPartUpload(key string) (*storage.CreateMultipartUploadOutput, error) {
var err error
var once bool
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = maxRetry
bo.MaxElapsedTime = maxRetry
for {
resp, err := s3.S3Session.CreateMultipartUpload(&storage.CreateMultipartUploadInput{
Bucket: aws.String(s3.StorageBucket),
Key: aws.String(key),
})
if err == nil {
s3.logger.Sugar().Info("Created MultiPartUpload for ", key)
return resp, nil
}

if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == "NotFound" {
return nil, err
}

s3.logger.Sugar().Errorf("CreateMultipartUpload error: %s %s", aerr.Code(), aerr.Message())
if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" {
if once {
s3.logger.Error("Return false cause in CreateMultipartUpload object: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", key))
return nil, err
}
s3.logger.Sugar().Info("CreateMultipartUpload one more time ", key)
once = true
rand.Seed(time.Now().UnixNano())
n := rand.Intn(3) // n will be between 0 and 10
time.Sleep(time.Duration(n) * time.Second)
}
}
s3.logger.Debug("CreateMultipartUpload error. Retrying")
d := bo.NextBackOff()
if d == backoff.Stop {
s3.logger.Debug("CreateMultipartUpload error. Retry time out", zap.Error(err))
break
}
s3.logger.Sugar().Info("CreateMultipartUpload error. Retry in ", d)
time.Sleep(d)

}
return nil, err
}


func (s3 *S3) completeMultiPartUpload( mpuOut *storage.CreateMultipartUploadOutput, parts []*storage.CompletedPart) (*storage.CompleteMultipartUploadOutput, error) {
var err error
var once bool
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = maxRetry
bo.MaxElapsedTime = maxRetry
for {
resp, err := s3.S3Session.CompleteMultipartUpload(&storage.CompleteMultipartUploadInput{
Bucket: aws.String(s3.StorageBucket),
Key: mpuOut.Key,
UploadId: mpuOut.UploadId,
MultipartUpload: &storage.CompletedMultipartUpload{
Parts: parts,
},
})
if err == nil {
s3.logger.Sugar().Info("Completed Multipart Upload ", mpuOut.Key)
return resp, nil
}

if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == "NotFound" {
return nil, err
}

s3.logger.Sugar().Errorf("CompleteMultipartUpload error: %s %s", aerr.Code(), aerr.Message())
if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" {
if once {
s3.logger.Error("Return false cause in CompleteMultipartUpload: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", *mpuOut.Key))
return nil, err
}
s3.logger.Sugar().Info("CompleteMultipartUpload one more time ", mpuOut.Key)
once = true
rand.Seed(time.Now().UnixNano())
n := rand.Intn(3) // n will be between 0 and 10
time.Sleep(time.Duration(n) * time.Second)
}
}
s3.logger.Debug("CompleteMultipartUpload error. Retrying")
d := bo.NextBackOff()
if d == backoff.Stop {
s3.logger.Debug("CompleteMultipartUpload error. Retry time out", zap.Error(err))
break
}
s3.logger.Sugar().Info("CompleteMultipartUpload error. Retry in ", d)
time.Sleep(d)

}
return nil, err
}


func (s3 *S3) abortMultiPartUpload(mpuOut *storage.CreateMultipartUploadOutput) (error) {
var err error
var once bool
bo := backoff.NewExponentialBackOff()
bo.MaxInterval = maxRetry
bo.MaxElapsedTime = maxRetry
for {
_, err := s3.S3Session.AbortMultipartUpload(&storage.AbortMultipartUploadInput{
Bucket: aws.String(s3.StorageBucket),
Key: mpuOut.Key,
UploadId: mpuOut.UploadId,
})
if err == nil {
s3.logger.Sugar().Info("AbortMultipartUpload Upload ", mpuOut.Key)
return nil
}

if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == "NotFound" {
return err
}

s3.logger.Sugar().Errorf("AbortMultipartUpload error: %s %s", aerr.Code(), aerr.Message())
if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" {
if once {
s3.logger.Error("Return false cause in AbortMultipartUpload: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", *mpuOut.Key))
return err
}
s3.logger.Sugar().Info("AbortMultipartUpload one more time ", mpuOut.Key)
once = true
rand.Seed(time.Now().UnixNano())
n := rand.Intn(3) // n will be between 0 and 10
time.Sleep(time.Duration(n) * time.Second)
}
}
s3.logger.Debug("AbortMultipartUpload error. Retrying")
d := bo.NextBackOff()
if d == backoff.Stop {
s3.logger.Debug("AbortMultipartUpload error. Retry time out", zap.Error(err))
break
}
s3.logger.Sugar().Info("AbortMultipartUpload error. Retry in ", d)
time.Sleep(d)

}
return err
}

func (s3 *S3) uploadPart(resp *storage.CreateMultipartUploadOutput, fileBytes []byte, partNum int) (*storage.CompletedPart, error) {
tryNum := 1
maxRetries := 3
partInput := &storage.UploadPartInput{
Body: bytes.NewReader(fileBytes),
Bucket: resp.Bucket,
Key: resp.Key,
PartNumber: aws.Int64(int64(partNum)),
UploadId: resp.UploadId,
ContentLength: aws.Int64(int64(len(fileBytes))),
}

for tryNum <= maxRetries {
uploadResult, err := s3.S3Session.UploadPart(partInput)
if err != nil {
if tryNum == maxRetries {
if aerr, ok := err.(awserr.Error); ok {
return nil, aerr
}
return nil, err
}
s3.logger.Sugar().Info("Retrying to upload part #%v\n", partNum)
tryNum++
} else {
s3.logger.Sugar().Info("Uploaded part #%v\n", partNum)
return &storage.CompletedPart{
ETag: uploadResult.ETag,
PartNumber: aws.Int64(int64(partNum)),
}, nil
}
}
return nil, nil
}


func (s3 *S3) RefreshCredential(credential storage_vault.Credential) error {
cred := credentials.NewStaticCredentials(credential.AwsAccessKeyId, credential.AwsSecretAccessKey, credential.Token)
_, err := cred.Get()
Expand Down

0 comments on commit 7d892e7

Please sign in to comment.