diff --git a/pkg/storage_vault/s3/s3.go b/pkg/storage_vault/s3/s3.go index d2db3a2..0cb7262 100644 --- a/pkg/storage_vault/s3/s3.go +++ b/pkg/storage_vault/s3/s3.go @@ -2,6 +2,7 @@ package s3 import ( "bytes" + "fmt" "io/ioutil" "math/rand" "net/http" @@ -54,6 +55,8 @@ func (s3 *S3) ID() (string, string) { var _ storage_vault.StorageVault = (*S3)(nil) var uploadKb, downloadKb int +var maxPartSize = int64(50 * 1024 * 1024) + func NewS3Default(vault backupapi.StorageVault, actionID string, limitUpload, limitDownload int, backupClient *backupapi.Client) (*S3, error) { uploadKb, downloadKb = limitUpload, limitDownload @@ -109,6 +112,7 @@ func NewS3Default(vault backupapi.StorageVault, actionID string, limitUpload, li Endpoint: aws.String(vault.Credential.AwsLocation), Region: aws.String(vault.Credential.Region), S3ForcePathStyle: aws.Bool(true), + LogLevel: aws.LogLevel(aws.LogDebug), HTTPClient: &http.Client{Transport: rt}, }))) s3.S3Session = sess @@ -149,7 +153,7 @@ func (s3 *S3) VerifyObject(key string) (bool, bool, string, error) { break } s3.logger.Sugar().Errorf("VerifyObject error: %s %s", aerr.Code(), aerr.Message()) - if (aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden") && s3.Type().CredentialType == "DEFAULT" { + if (aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" || aerr.Code() == "SignatureDoesNotMatch" ) && s3.Type().CredentialType == "DEFAULT" { s3.logger.Sugar().Info("GetCredential in head object ", key) storageVaultID, actID := s3.ID() vault, err := s3.backupClient.GetCredentialStorageVault(storageVaultID, actID, nil) @@ -187,11 +191,16 @@ func (s3 *S3) PutObject(key string, data []byte) error { isExist, integrity, _, _ := s3.VerifyObject(key) if isExist { if !integrity { - _, err = s3.S3Session.PutObject(&storage.PutObjectInput{ - Bucket: aws.String(s3.StorageBucket), - Key: aws.String(key), - Body: bytes.NewReader(data), - }) + if int64(len(data)) > maxPartSize { + err = s3.putObjectMultiPart(key, data) + } else { + _, err = s3.S3Session.PutObject(&storage.PutObjectInput{ + Bucket: aws.String(s3.StorageBucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + }) + } + if err == nil { break } @@ -199,20 +208,28 @@ func (s3 *S3) PutObject(key string, data []byte) error { break } } else { - _, err = s3.S3Session.PutObject(&storage.PutObjectInput{ - Bucket: aws.String(s3.StorageBucket), - Key: aws.String(key), - Body: bytes.NewReader(data), - }) + if int64(len(data)) > maxPartSize { + err = s3.putObjectMultiPart(key, data) + } else { + _, err = s3.S3Session.PutObject(&storage.PutObjectInput{ + Bucket: aws.String(s3.StorageBucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + }) + } if !strings.Contains(key, "chunk.json") && !strings.Contains(key, "index.json") && !strings.Contains(key, "file.csv") { isExist, integrity, _, _ = s3.VerifyObject(key) if isExist { if !integrity { - _, err = s3.S3Session.PutObject(&storage.PutObjectInput{ - Bucket: aws.String(s3.StorageBucket), - Key: aws.String(key), - Body: bytes.NewReader(data), - }) + if int64(len(data)) > maxPartSize { + err = s3.putObjectMultiPart(key, data) + } else { + _, err = s3.S3Session.PutObject(&storage.PutObjectInput{ + Bucket: aws.String(s3.StorageBucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + }) + } if err == nil { break } @@ -227,7 +244,7 @@ func (s3 *S3) PutObject(key string, data []byte) error { } if aerr, ok := err.(awserr.Error); ok { s3.logger.Sugar().Errorf("PutObject error: %s %s", aerr.Code(), aerr.Message()) - if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" { + if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" || aerr.Code() == "SignatureDoesNotMatch" { if once { s3.logger.Error("Return false cause in put object: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", key)) return err @@ -252,6 +269,47 @@ func (s3 *S3) PutObject(key string, data []byte) error { return err } + +func (s3 *S3) putObjectMultiPart(key string, data []byte) (error) { + respMPU, err := s3.createMultiPartUpload(key) + if err != nil { + return err + } + var curr, partLength int64 + var remaining = int64(len(data)) + var completedParts []*storage.CompletedPart + partNumber := 1 + for curr = 0; remaining != 0; curr += partLength { + if remaining < maxPartSize { + partLength = remaining + } else { + partLength = maxPartSize + } + completedPart, err := s3.uploadPart(respMPU, data[curr:curr+partLength], partNumber) + if err != nil { + fmt.Println(err.Error()) + err := s3.abortMultiPartUpload(respMPU) + if err != nil { + s3.logger.Sugar().Error(err.Error()) + return err + } + return nil + } + remaining -= partLength + partNumber++ + completedParts = append(completedParts, completedPart) + } + + completeResponse, err := s3.completeMultiPartUpload(respMPU, completedParts) + if err != nil { + s3.logger.Sugar().Error(err.Error()) + return err + } + + s3.logger.Sugar().Info("Successfully uploaded file: %s\n", completeResponse.String()) + return nil +} + func (s3 *S3) GetObject(key string) ([]byte, error) { var err error var once bool @@ -350,6 +408,190 @@ func (s3 *S3) HeadObject(key string) (bool, string, error) { return false, "", err } + +func (s3 *S3) createMultiPartUpload(key string) (*storage.CreateMultipartUploadOutput, error) { + var err error + var once bool + bo := backoff.NewExponentialBackOff() + bo.MaxInterval = maxRetry + bo.MaxElapsedTime = maxRetry + for { + resp, err := s3.S3Session.CreateMultipartUpload(&storage.CreateMultipartUploadInput{ + Bucket: aws.String(s3.StorageBucket), + Key: aws.String(key), + }) + if err == nil { + s3.logger.Sugar().Info("Created MultiPartUpload for ", key) + return resp, nil + } + + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == "NotFound" { + return nil, err + } + + s3.logger.Sugar().Errorf("CreateMultipartUpload error: %s %s", aerr.Code(), aerr.Message()) + if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" { + if once { + s3.logger.Error("Return false cause in CreateMultipartUpload object: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", key)) + return nil, err + } + s3.logger.Sugar().Info("CreateMultipartUpload one more time ", key) + once = true + rand.Seed(time.Now().UnixNano()) + n := rand.Intn(3) // n will be between 0 and 10 + time.Sleep(time.Duration(n) * time.Second) + } + } + s3.logger.Debug("CreateMultipartUpload error. Retrying") + d := bo.NextBackOff() + if d == backoff.Stop { + s3.logger.Debug("CreateMultipartUpload error. Retry time out", zap.Error(err)) + break + } + s3.logger.Sugar().Info("CreateMultipartUpload error. Retry in ", d) + time.Sleep(d) + + } + return nil, err +} + + +func (s3 *S3) completeMultiPartUpload( mpuOut *storage.CreateMultipartUploadOutput, parts []*storage.CompletedPart) (*storage.CompleteMultipartUploadOutput, error) { + var err error + var once bool + bo := backoff.NewExponentialBackOff() + bo.MaxInterval = maxRetry + bo.MaxElapsedTime = maxRetry + for { + resp, err := s3.S3Session.CompleteMultipartUpload(&storage.CompleteMultipartUploadInput{ + Bucket: aws.String(s3.StorageBucket), + Key: mpuOut.Key, + UploadId: mpuOut.UploadId, + MultipartUpload: &storage.CompletedMultipartUpload{ + Parts: parts, + }, + }) + if err == nil { + s3.logger.Sugar().Info("Completed Multipart Upload ", mpuOut.Key) + return resp, nil + } + + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == "NotFound" { + return nil, err + } + + s3.logger.Sugar().Errorf("CompleteMultipartUpload error: %s %s", aerr.Code(), aerr.Message()) + if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" { + if once { + s3.logger.Error("Return false cause in CompleteMultipartUpload: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", *mpuOut.Key)) + return nil, err + } + s3.logger.Sugar().Info("CompleteMultipartUpload one more time ", mpuOut.Key) + once = true + rand.Seed(time.Now().UnixNano()) + n := rand.Intn(3) // n will be between 0 and 10 + time.Sleep(time.Duration(n) * time.Second) + } + } + s3.logger.Debug("CompleteMultipartUpload error. Retrying") + d := bo.NextBackOff() + if d == backoff.Stop { + s3.logger.Debug("CompleteMultipartUpload error. Retry time out", zap.Error(err)) + break + } + s3.logger.Sugar().Info("CompleteMultipartUpload error. Retry in ", d) + time.Sleep(d) + + } + return nil, err +} + + +func (s3 *S3) abortMultiPartUpload(mpuOut *storage.CreateMultipartUploadOutput) (error) { + var err error + var once bool + bo := backoff.NewExponentialBackOff() + bo.MaxInterval = maxRetry + bo.MaxElapsedTime = maxRetry + for { + _, err := s3.S3Session.AbortMultipartUpload(&storage.AbortMultipartUploadInput{ + Bucket: aws.String(s3.StorageBucket), + Key: mpuOut.Key, + UploadId: mpuOut.UploadId, + }) + if err == nil { + s3.logger.Sugar().Info("AbortMultipartUpload Upload ", mpuOut.Key) + return nil + } + + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == "NotFound" { + return err + } + + s3.logger.Sugar().Errorf("AbortMultipartUpload error: %s %s", aerr.Code(), aerr.Message()) + if aerr.Code() == "AccessDenied" || aerr.Code() == "Forbidden" { + if once { + s3.logger.Error("Return false cause in AbortMultipartUpload: ", zap.Error(err), zap.String("code", aerr.Code()), zap.String("key", *mpuOut.Key)) + return err + } + s3.logger.Sugar().Info("AbortMultipartUpload one more time ", mpuOut.Key) + once = true + rand.Seed(time.Now().UnixNano()) + n := rand.Intn(3) // n will be between 0 and 10 + time.Sleep(time.Duration(n) * time.Second) + } + } + s3.logger.Debug("AbortMultipartUpload error. Retrying") + d := bo.NextBackOff() + if d == backoff.Stop { + s3.logger.Debug("AbortMultipartUpload error. Retry time out", zap.Error(err)) + break + } + s3.logger.Sugar().Info("AbortMultipartUpload error. Retry in ", d) + time.Sleep(d) + + } + return err +} + +func (s3 *S3) uploadPart(resp *storage.CreateMultipartUploadOutput, fileBytes []byte, partNum int) (*storage.CompletedPart, error) { + tryNum := 1 + maxRetries := 3 + partInput := &storage.UploadPartInput{ + Body: bytes.NewReader(fileBytes), + Bucket: resp.Bucket, + Key: resp.Key, + PartNumber: aws.Int64(int64(partNum)), + UploadId: resp.UploadId, + ContentLength: aws.Int64(int64(len(fileBytes))), + } + + for tryNum <= maxRetries { + uploadResult, err := s3.S3Session.UploadPart(partInput) + if err != nil { + if tryNum == maxRetries { + if aerr, ok := err.(awserr.Error); ok { + return nil, aerr + } + return nil, err + } + s3.logger.Sugar().Info("Retrying to upload part #%v\n", partNum) + tryNum++ + } else { + s3.logger.Sugar().Info("Uploaded part #%v\n", partNum) + return &storage.CompletedPart{ + ETag: uploadResult.ETag, + PartNumber: aws.Int64(int64(partNum)), + }, nil + } + } + return nil, nil +} + + func (s3 *S3) RefreshCredential(credential storage_vault.Credential) error { cred := credentials.NewStaticCredentials(credential.AwsAccessKeyId, credential.AwsSecretAccessKey, credential.Token) _, err := cred.Get()