Skip to content

Commit

Permalink
add alreadySync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR committed Sep 24, 2024
1 parent b1b481b commit 7f8a249
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 40 deletions.
46 changes: 36 additions & 10 deletions pkg/apis/pingcap/v1alpha1/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ func IsBackupCleanFailed(backup *Backup) bool {

// IsCleanCandidate returns true if a Backup should be added to clean candidate according to cleanPolicy
func IsCleanCandidate(backup *Backup) bool {
if backup.Spec.Mode == BackupModeLog {
return true
}
switch backup.Spec.CleanPolicy {
case CleanPolicyTypeDelete, CleanPolicyTypeOnFailure:
return true
Expand All @@ -324,34 +327,57 @@ func NeedNotClean(backup *Backup) bool {
}

// ParseLogBackupSubcommand parse the log backup subcommand from cr.
// The parse priority of the command is stop > truncate > start.
func ParseLogBackupSubcommand(backup *Backup) LogSubCommandType {
if backup.Spec.Mode != BackupModeLog {
return ""
}

//Compatible with the old version
// Compatible with the old version
if backup.Spec.LogStop {
if isSubcommandAlreadySync(backup, LogStopCommand) {
return LogTruncateCommand
}
return LogStopCommand
}

var subCommand LogSubCommandType
switch backup.Spec.LogSubcommand {
case "log-start":
if IsLogBackupAlreadyPaused(backup) {
return LogResumeCommand
subCommand = LogResumeCommand
} else {
subCommand = LogStartCommand
}
return LogStartCommand
case "log-stop":
return LogStopCommand
subCommand = LogStopCommand
case "log-pause":
return LogPauseCommand
subCommand = LogPauseCommand
case "log-truncate":
subCommand = LogTruncateCommand
default:
subCommand = LogStartCommand
}

// If the selected subcommand is already done, switch to LogTruncateCommand
if isSubcommandAlreadySync(backup, subCommand) {
return LogTruncateCommand
}

return subCommand
}

func isSubcommandAlreadySync(backup *Backup, subCommand LogSubCommandType) bool {
switch subCommand {
case LogStartCommand:
return IsLogBackupAlreadyStart(backup)
case LogStopCommand:
return IsLogBackupAlreadyStop(backup)
case LogPauseCommand:
return IsLogBackupAlreadyPaused(backup)
case LogResumeCommand:
return IsLogBackupAlreadyRunning(backup)
default:
if backup.Spec.LogTruncateUntil != "" {
return LogTruncateCommand
}
return LogStartCommand
return false
}
}

Expand Down
50 changes: 20 additions & 30 deletions pkg/backup/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,46 +1078,36 @@ func (bm *backupManager) skipSnapshotBackupSync(backup *v1alpha1.Backup) (bool,
return false, nil
}

// skipLogBackupSync skip log backup, returns true if can be skipped.
// skipLogBackupSync skips log backup, returns true if it can be skipped.
func (bm *backupManager) skipLogBackupSync(backup *v1alpha1.Backup) (bool, error) {
if backup.Spec.Mode != v1alpha1.BackupModeLog {
return false, nil
}
var skip bool
var err error

command := v1alpha1.ParseLogBackupSubcommand(backup)
switch command {
case v1alpha1.LogStartCommand:
skip = v1alpha1.IsLogBackupAlreadyStart(backup)
case v1alpha1.LogStopCommand:
skip = v1alpha1.IsLogBackupAlreadyStop(backup)
case v1alpha1.LogPauseCommand:
skip = v1alpha1.IsLogBackupAlreadyPaused(backup)
case v1alpha1.LogResumeCommand:
skip = v1alpha1.IsLogBackupAlreadyRunning(backup)
case v1alpha1.LogTruncateCommand:
if v1alpha1.IsLogBackupAlreadyTruncate(backup) {
skip = true
// if skip truncate, we need update truncate to be complete, and truncating util is the spec's truncate until.
updateStatus := &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: time.Now()},
TimeCompleted: &metav1.Time{Time: time.Now()},
LogTruncatingUntil: &backup.Spec.LogTruncateUntil,
}
err = bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{
Command: v1alpha1.LogTruncateCommand,
Type: v1alpha1.BackupComplete,
Status: corev1.ConditionTrue,
}, updateStatus)

// Handle the special case for LogTruncateCommand where additional actions are needed
var err error
if command == v1alpha1.LogTruncateCommand && v1alpha1.IsLogBackupAlreadyTruncate(backup) {
// If skipping truncate, update status
updateStatus := &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: time.Now()},
TimeCompleted: &metav1.Time{Time: time.Now()},
LogTruncatingUntil: &backup.Spec.LogTruncateUntil,
}
default:
return false, nil
err = bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{
Command: v1alpha1.LogTruncateCommand,
Type: v1alpha1.BackupComplete,
Status: corev1.ConditionTrue,
}, updateStatus)
}

if skip {
if command == v1alpha1.LogTruncateCommand {
klog.Infof("log backup %s/%s subcommand %s is already done, will skip sync.", backup.Namespace, backup.Name, command)
return true, err
}
return skip, err

return false, nil
}

// skipVolumeSnapshotBackupSync skip volume snapshot backup, returns true if can be skipped.
Expand Down

0 comments on commit 7f8a249

Please sign in to comment.