From 1778891c2d465dcce3147574a0bdd3e35a954f52 Mon Sep 17 00:00:00 2001 From: Dmytro Zghoba Date: Tue, 18 Jul 2023 18:29:42 +0300 Subject: [PATCH 1/4] [PBM-1154] add backup.timeouts.startingStatus --- agent/restore.go | 2 +- cli/backup.go | 2 +- pbm/backup/backup.go | 8 +++++++- pbm/backup/logical.go | 2 +- pbm/backup/physical.go | 2 +- pbm/config.go | 16 ++++++++++++++++ pbm/pbm.go | 3 +-- pbm/pitr/pitr.go | 14 +++++++------- 8 files changed, 35 insertions(+), 14 deletions(-) diff --git a/agent/restore.go b/agent/restore.go index e2bbd0722..454033742 100644 --- a/agent/restore.go +++ b/agent/restore.go @@ -211,7 +211,7 @@ func (a *Agent) pitr() (err error) { w: w, }) - streamErr := ibcp.Stream(ctx, w, cfg.PITR.Compression, cfg.PITR.CompressionLevel) + streamErr := ibcp.Stream(ctx, w, cfg.PITR.Compression, cfg.PITR.CompressionLevel, cfg.Backup.Timeouts) if streamErr != nil { switch streamErr.(type) { case pitr.ErrOpMoved: diff --git a/cli/backup.go b/cli/backup.go index d96486a49..725a579af 100644 --- a/cli/backup.go +++ b/cli/backup.go @@ -134,7 +134,7 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error) } fmt.Printf("Starting backup '%s'", b.name) - ctx, cancel := context.WithTimeout(context.Background(), pbm.WaitBackupStart) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Backup.Timeouts.StartingStatus()) defer cancel() err = waitForBcpStatus(ctx, cn, b.name) if err != nil { diff --git a/pbm/backup/backup.go b/pbm/backup/backup.go index 2c2a1e50b..c40e170cc 100644 --- a/pbm/backup/backup.go +++ b/pbm/backup/backup.go @@ -32,6 +32,7 @@ type Backup struct { node *pbm.Node typ pbm.BackupType incrBase bool + timeouts *pbm.BackupTimeouts } func New(cn *pbm.PBM, node *pbm.Node) *Backup { @@ -97,6 +98,7 @@ func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, bala return errors.Wrap(err, "unable to get PBM config settings") } meta.Store = cfg.Storage + b.timeouts = cfg.Backup.Timeouts ver, err := b.node.GetMongoVersion() if err != nil { @@ -232,7 +234,7 @@ func (b *Backup) Run(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OPID, l * // Waiting for StatusStarting to move further. // In case some preparations has to be done before backup. - err = b.waitForStatus(bcp.Name, pbm.StatusStarting, &pbm.WaitBackupStart) + err = b.waitForStatus(bcp.Name, pbm.StatusStarting, ref(b.timeouts.StartingStatus())) if err != nil { return errors.Wrap(err, "waiting for start") } @@ -682,3 +684,7 @@ func (b *Backup) setClusterLastWrite(bcpName string) error { err = b.cn.SetLastWrite(bcpName, lw) return errors.Wrap(err, "set timestamp") } + +func ref[T any](v T) *T { + return &v +} diff --git a/pbm/backup/logical.go b/pbm/backup/logical.go index f02afc452..ab4000064 100644 --- a/pbm/backup/logical.go +++ b/pbm/backup/logical.go @@ -60,7 +60,7 @@ func (b *Backup) doLogical(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OPI } if inf.IsLeader() { - err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, &pbm.WaitBackupStart) + err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, ref(b.timeouts.StartingStatus())) if err != nil { if errors.Cause(err) == errConvergeTimeOut { return errors.Wrap(err, "couldn't get response from all shards") diff --git a/pbm/backup/physical.go b/pbm/backup/physical.go index 1c2f3db83..67e15dec4 100644 --- a/pbm/backup/physical.go +++ b/pbm/backup/physical.go @@ -245,7 +245,7 @@ func (b *Backup) doPhysical(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OP } if inf.IsLeader() { - err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, &pbm.WaitBackupStart) + err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, ref(b.timeouts.StartingStatus())) if err != nil { if errors.Cause(err) == errConvergeTimeOut { return errors.Wrap(err, "couldn't get response from all shards") diff --git a/pbm/config.go b/pbm/config.go index 987d028d6..e6b747a4f 100644 --- a/pbm/config.go +++ b/pbm/config.go @@ -148,10 +148,26 @@ type RestoreConf struct { type BackupConf struct { Priority map[string]float64 `bson:"priority,omitempty" json:"priority,omitempty" yaml:"priority,omitempty"` + Timeouts *BackupTimeouts `bson:"timeouts,omitempty" json:"timeouts,omitempty" yaml:"timeouts,omitempty"` Compression compress.CompressionType `bson:"compression,omitempty" json:"compression,omitempty" yaml:"compression,omitempty"` CompressionLevel *int `bson:"compressionLevel,omitempty" json:"compressionLevel,omitempty" yaml:"compressionLevel,omitempty"` } +type BackupTimeouts struct { + // Starting is timeout (in seconds) to wait for a backup to start. + Starting *uint32 `bson:"startingStatus,omitempty" json:"startingStatus,omitempty" yaml:"startingStatus,omitempty"` +} + +// StartingStatus returns timeout duration for . +// If not set or zero, returns default value (WaitBackupStart). +func (t *BackupTimeouts) StartingStatus() time.Duration { + if t == nil || t.Starting == nil || *t.Starting == 0 { + return WaitBackupStart + } + + return time.Duration(*t.Starting) * time.Second +} + type confMap map[string]reflect.Kind // _confmap is a list of config's valid keys and its types diff --git a/pbm/pbm.go b/pbm/pbm.go index 4b269211a..919962845 100644 --- a/pbm/pbm.go +++ b/pbm/pbm.go @@ -199,7 +199,6 @@ func (r RestoreCmd) String() string { } return fmt.Sprintf("name: %s, %s", r.Name, bcp) - } type ReplayCmd struct { @@ -237,7 +236,7 @@ const ( var ( WaitActionStart = time.Second * 15 - WaitBackupStart = WaitActionStart + PITRcheckRange*12/10 + WaitBackupStart = WaitActionStart + PITRcheckRange*12/10 // 33 seconds ) // OpLog represents log of started operation. diff --git a/pbm/pitr/pitr.go b/pbm/pitr/pitr.go index 6abd81d49..78908a5f8 100644 --- a/pbm/pitr/pitr.go +++ b/pbm/pitr/pitr.go @@ -247,7 +247,7 @@ func (e ErrOpMoved) Error() string { const LogStartMsg = "start_ok" // Stream streaming (saving) chunks of the oplog to the given storage -func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compression compress.CompressionType, level *int) error { +func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compression compress.CompressionType, level *int, timeouts *pbm.BackupTimeouts) error { if s.lastTS.T == 0 { return errors.New("no starting point defined") } @@ -290,7 +290,7 @@ func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compres s.l.Info("got wake_up signal") if bcp != nil { s.l.Info("wake_up for bcp %s", bcp.String()) - sliceTo, err = s.backupStartTS(bcp.String()) + sliceTo, err = s.backupStartTS(bcp.String(), timeouts.StartingStatus()) if err != nil { return errors.Wrap(err, "get backup start TS") } @@ -332,7 +332,7 @@ func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compres // and a new worker was elected; // - any other case (including no lock) is the undefined behaviour - return. // - ld, err := s.getOpLock(llock) + ld, err := s.getOpLock(llock, timeouts.StartingStatus()) if err != nil { return errors.Wrap(err, "check lock") } @@ -441,10 +441,10 @@ func formatts(t primitive.Timestamp) string { return time.Unix(int64(t.T), 0).UTC().Format("2006-01-02T15:04:05") } -func (s *Slicer) getOpLock(l *pbm.LockHeader) (ld pbm.LockData, err error) { +func (s *Slicer) getOpLock(l *pbm.LockHeader, t time.Duration) (ld pbm.LockData, err error) { tk := time.NewTicker(time.Second) defer tk.Stop() - for j := 0; j < int(pbm.WaitBackupStart.Seconds()); j++ { + for j := 0; j < int(t.Seconds()); j++ { ld, err = s.pbm.GetLockData(l) if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return ld, errors.Wrap(err, "get") @@ -458,10 +458,10 @@ func (s *Slicer) getOpLock(l *pbm.LockHeader) (ld pbm.LockData, err error) { return ld, nil } -func (s *Slicer) backupStartTS(opid string) (ts primitive.Timestamp, err error) { +func (s *Slicer) backupStartTS(opid string, t time.Duration) (ts primitive.Timestamp, err error) { tk := time.NewTicker(time.Second) defer tk.Stop() - for j := 0; j < int(pbm.WaitBackupStart.Seconds()); j++ { + for j := 0; j < int(t.Seconds()); j++ { b, err := s.pbm.GetBackupByOPID(opid) if err != nil && err != pbm.ErrNotFound { return ts, errors.Wrap(err, "get backup meta") From 4ba2a2f32e60c6181adf374ca9d3dc6631e658f8 Mon Sep 17 00:00:00 2001 From: Dmytro Zghoba Date: Tue, 18 Jul 2023 19:27:01 +0300 Subject: [PATCH 2/4] [PBM-1154] fix config --set --- pbm/config.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pbm/config.go b/pbm/config.go index e6b747a4f..39710cddd 100644 --- a/pbm/config.go +++ b/pbm/config.go @@ -267,7 +267,9 @@ func (p *PBM) SetConfigVar(key, val string) error { switch _confmap[key] { case reflect.String: v = val - case reflect.Int, reflect.Int64: + case reflect.Uint, reflect.Uint32, reflect.Uint64: + v, err = strconv.ParseUint(val, 10, 64) + case reflect.Int, reflect.Int32, reflect.Int64: v, err = strconv.ParseInt(val, 10, 64) case reflect.Float32, reflect.Float64: v, err = strconv.ParseFloat(val, 64) From c3357b9a4341225f41fa7fc324c85f21fb0c46c7 Mon Sep 17 00:00:00 2001 From: Dmytro Zghoba Date: Thu, 20 Jul 2023 10:09:52 +0300 Subject: [PATCH 3/4] [PBM-1154] ensure timeouts on each node --- agent/backup.go | 14 +++++++++++++- pbm/backup/backup.go | 16 ++++++---------- pbm/storage/storage.go | 16 ++++++++++++++++ 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/agent/backup.go b/agent/backup.go index f5e4fbc16..1612f7eb2 100644 --- a/agent/backup.go +++ b/agent/backup.go @@ -9,6 +9,7 @@ import ( "github.com/percona/percona-backup-mongodb/pbm" "github.com/percona/percona-backup-mongodb/pbm/backup" "github.com/percona/percona-backup-mongodb/pbm/log" + "github.com/percona/percona-backup-mongodb/pbm/storage" ) type currentBackup struct { @@ -96,6 +97,17 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { bcp = backup.New(a.pbm, a.node) } + cfg, err := a.pbm.GetConfig() + if err != nil { + l.Error("unable to get PBM config settings: " + err.Error()) + return + } + if storage.ParseType(string(cfg.Storage.Type)) == storage.Undef { + l.Error("backups cannot be saved because PBM storage configuration hasn't been set yet") + return + } + bcp.SetTimeouts(cfg.Backup.Timeouts) + if nodeInfo.IsClusterLeader() { balancer := pbm.BalancerModeOff if nodeInfo.IsSharded() { @@ -108,7 +120,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) { balancer = pbm.BalancerModeOn } } - err = bcp.Init(cmd, opid, nodeInfo, balancer) + err = bcp.Init(cmd, opid, nodeInfo, cfg.Storage, balancer) if err != nil { l.Error("init meta: %v", err) return diff --git a/pbm/backup/backup.go b/pbm/backup/backup.go index c40e170cc..97231ec04 100644 --- a/pbm/backup/backup.go +++ b/pbm/backup/backup.go @@ -68,7 +68,11 @@ func NewIncremental(cn *pbm.PBM, node *pbm.Node, base bool) *Backup { } } -func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, balancer pbm.BalancerMode) error { +func (b *Backup) SetTimeouts(t *pbm.BackupTimeouts) { + b.timeouts = t +} + +func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, store pbm.StorageConf, balancer pbm.BalancerMode) error { ts, err := b.cn.ClusterTime() if err != nil { return errors.Wrap(err, "read cluster time") @@ -80,6 +84,7 @@ func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, bala Name: bcp.Name, Namespaces: bcp.Namespaces, Compression: bcp.Compression, + Store: store, StartTS: time.Now().Unix(), Status: pbm.StatusStarting, Replsets: []pbm.BackupReplset{}, @@ -91,15 +96,6 @@ func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, bala Hb: ts, } - cfg, err := b.cn.GetConfig() - if err == pbm.ErrStorageUndefined { - return errors.New("backups cannot be saved because PBM storage configuration hasn't been set yet") - } else if err != nil { - return errors.Wrap(err, "unable to get PBM config settings") - } - meta.Store = cfg.Storage - b.timeouts = cfg.Backup.Timeouts - ver, err := b.node.GetMongoVersion() if err != nil { return errors.WithMessage(err, "get mongo version") diff --git a/pbm/storage/storage.go b/pbm/storage/storage.go index 58d29de09..069479b06 100644 --- a/pbm/storage/storage.go +++ b/pbm/storage/storage.go @@ -42,3 +42,19 @@ type Storage interface { // Copy makes a copy of the src objec/file under dst name Copy(src, dst string) error } + +// ParseType parses string and returns storage type +func ParseType(s string) Type { + switch s { + case string(S3): + return S3 + case string(Azure): + return Azure + case string(Filesystem): + return Filesystem + case string(BlackHole): + return BlackHole + default: + return Undef + } +} From 480a0a7d944f11cd6ba1061067f000daecba83ec Mon Sep 17 00:00:00 2001 From: Dmytro Zghoba Date: Thu, 20 Jul 2023 18:30:12 +0300 Subject: [PATCH 4/4] [PBM-1154] fix parsing --- pbm/config.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pbm/config.go b/pbm/config.go index 39710cddd..82f625686 100644 --- a/pbm/config.go +++ b/pbm/config.go @@ -267,11 +267,17 @@ func (p *PBM) SetConfigVar(key, val string) error { switch _confmap[key] { case reflect.String: v = val - case reflect.Uint, reflect.Uint32, reflect.Uint64: + case reflect.Uint, reflect.Uint32: + v, err = strconv.ParseUint(val, 10, 32) + case reflect.Uint64: v, err = strconv.ParseUint(val, 10, 64) - case reflect.Int, reflect.Int32, reflect.Int64: + case reflect.Int, reflect.Int32: + v, err = strconv.ParseInt(val, 10, 32) + case reflect.Int64: v, err = strconv.ParseInt(val, 10, 64) - case reflect.Float32, reflect.Float64: + case reflect.Float32: + v, err = strconv.ParseFloat(val, 32) + case reflect.Float64: v, err = strconv.ParseFloat(val, 64) case reflect.Bool: v, err = strconv.ParseBool(val)