Skip to content

Commit

Permalink
[PBM-1154] add backup.timeouts.startingStatus (#863)
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Jul 24, 2023
1 parent 4fea64a commit 152dc02
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 26 deletions.
14 changes: 13 additions & 1 deletion agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/percona/percona-backup-mongodb/pbm"
"github.com/percona/percona-backup-mongodb/pbm/backup"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/storage"
)

type currentBackup struct {
Expand Down Expand Up @@ -96,6 +97,17 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
bcp = backup.New(a.pbm, a.node)
}

cfg, err := a.pbm.GetConfig()
if err != nil {
l.Error("unable to get PBM config settings: " + err.Error())
return
}
if storage.ParseType(string(cfg.Storage.Type)) == storage.Undef {
l.Error("backups cannot be saved because PBM storage configuration hasn't been set yet")
return
}
bcp.SetTimeouts(cfg.Backup.Timeouts)

if nodeInfo.IsClusterLeader() {
balancer := pbm.BalancerModeOff
if nodeInfo.IsSharded() {
Expand All @@ -108,7 +120,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
balancer = pbm.BalancerModeOn
}
}
err = bcp.Init(cmd, opid, nodeInfo, balancer)
err = bcp.Init(cmd, opid, nodeInfo, cfg.Storage, balancer)
if err != nil {
l.Error("init meta: %v", err)
return
Expand Down
2 changes: 1 addition & 1 deletion agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (a *Agent) pitr() (err error) {
w: w,
})

streamErr := ibcp.Stream(ctx, w, cfg.PITR.Compression, cfg.PITR.CompressionLevel)
streamErr := ibcp.Stream(ctx, w, cfg.PITR.Compression, cfg.PITR.CompressionLevel, cfg.Backup.Timeouts)
if streamErr != nil {
switch streamErr.(type) {
case pitr.ErrOpMoved:
Expand Down
2 changes: 1 addition & 1 deletion cli/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error)
}

fmt.Printf("Starting backup '%s'", b.name)
ctx, cancel := context.WithTimeout(context.Background(), pbm.WaitBackupStart)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Backup.Timeouts.StartingStatus())
defer cancel()
err = waitForBcpStatus(ctx, cn, b.name)
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Backup struct {
node *pbm.Node
typ pbm.BackupType
incrBase bool
timeouts *pbm.BackupTimeouts
}

func New(cn *pbm.PBM, node *pbm.Node) *Backup {
Expand Down Expand Up @@ -67,7 +68,11 @@ func NewIncremental(cn *pbm.PBM, node *pbm.Node, base bool) *Backup {
}
}

func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, balancer pbm.BalancerMode) error {
func (b *Backup) SetTimeouts(t *pbm.BackupTimeouts) {
b.timeouts = t
}

func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, store pbm.StorageConf, balancer pbm.BalancerMode) error {
ts, err := b.cn.ClusterTime()
if err != nil {
return errors.Wrap(err, "read cluster time")
Expand All @@ -79,6 +84,7 @@ func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, bala
Name: bcp.Name,
Namespaces: bcp.Namespaces,
Compression: bcp.Compression,
Store: store,
StartTS: time.Now().Unix(),
Status: pbm.StatusStarting,
Replsets: []pbm.BackupReplset{},
Expand All @@ -90,14 +96,6 @@ func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, bala
Hb: ts,
}

cfg, err := b.cn.GetConfig()
if err == pbm.ErrStorageUndefined {
return errors.New("backups cannot be saved because PBM storage configuration hasn't been set yet")
} else if err != nil {
return errors.Wrap(err, "unable to get PBM config settings")
}
meta.Store = cfg.Storage

ver, err := b.node.GetMongoVersion()
if err != nil {
return errors.WithMessage(err, "get mongo version")
Expand Down Expand Up @@ -232,7 +230,7 @@ func (b *Backup) Run(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OPID, l *

// Waiting for StatusStarting to move further.
// In case some preparations has to be done before backup.
err = b.waitForStatus(bcp.Name, pbm.StatusStarting, &pbm.WaitBackupStart)
err = b.waitForStatus(bcp.Name, pbm.StatusStarting, ref(b.timeouts.StartingStatus()))
if err != nil {
return errors.Wrap(err, "waiting for start")
}
Expand Down Expand Up @@ -682,3 +680,7 @@ func (b *Backup) setClusterLastWrite(bcpName string) error {
err = b.cn.SetLastWrite(bcpName, lw)
return errors.Wrap(err, "set timestamp")
}

func ref[T any](v T) *T {
return &v
}
2 changes: 1 addition & 1 deletion pbm/backup/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (b *Backup) doLogical(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OPI
}

if inf.IsLeader() {
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, &pbm.WaitBackupStart)
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, ref(b.timeouts.StartingStatus()))
if err != nil {
if errors.Cause(err) == errConvergeTimeOut {
return errors.Wrap(err, "couldn't get response from all shards")
Expand Down
2 changes: 1 addition & 1 deletion pbm/backup/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (b *Backup) doPhysical(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OP
}

if inf.IsLeader() {
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, &pbm.WaitBackupStart)
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, ref(b.timeouts.StartingStatus()))
if err != nil {
if errors.Cause(err) == errConvergeTimeOut {
return errors.Wrap(err, "couldn't get response from all shards")
Expand Down
28 changes: 26 additions & 2 deletions pbm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,26 @@ type RestoreConf struct {

type BackupConf struct {
Priority map[string]float64 `bson:"priority,omitempty" json:"priority,omitempty" yaml:"priority,omitempty"`
Timeouts *BackupTimeouts `bson:"timeouts,omitempty" json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
Compression compress.CompressionType `bson:"compression,omitempty" json:"compression,omitempty" yaml:"compression,omitempty"`
CompressionLevel *int `bson:"compressionLevel,omitempty" json:"compressionLevel,omitempty" yaml:"compressionLevel,omitempty"`
}

type BackupTimeouts struct {
// Starting is timeout (in seconds) to wait for a backup to start.
Starting *uint32 `bson:"startingStatus,omitempty" json:"startingStatus,omitempty" yaml:"startingStatus,omitempty"`
}

// StartingStatus returns timeout duration for .
// If not set or zero, returns default value (WaitBackupStart).
func (t *BackupTimeouts) StartingStatus() time.Duration {
if t == nil || t.Starting == nil || *t.Starting == 0 {
return WaitBackupStart
}

return time.Duration(*t.Starting) * time.Second
}

type confMap map[string]reflect.Kind

// _confmap is a list of config's valid keys and its types
Expand Down Expand Up @@ -251,9 +267,17 @@ func (p *PBM) SetConfigVar(key, val string) error {
switch _confmap[key] {
case reflect.String:
v = val
case reflect.Int, reflect.Int64:
case reflect.Uint, reflect.Uint32:
v, err = strconv.ParseUint(val, 10, 32)
case reflect.Uint64:
v, err = strconv.ParseUint(val, 10, 64)
case reflect.Int, reflect.Int32:
v, err = strconv.ParseInt(val, 10, 32)
case reflect.Int64:
v, err = strconv.ParseInt(val, 10, 64)
case reflect.Float32, reflect.Float64:
case reflect.Float32:
v, err = strconv.ParseFloat(val, 32)
case reflect.Float64:
v, err = strconv.ParseFloat(val, 64)
case reflect.Bool:
v, err = strconv.ParseBool(val)
Expand Down
3 changes: 1 addition & 2 deletions pbm/pbm.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (r RestoreCmd) String() string {
}

return fmt.Sprintf("name: %s, %s", r.Name, bcp)

}

type ReplayCmd struct {
Expand Down Expand Up @@ -237,7 +236,7 @@ const (

var (
WaitActionStart = time.Second * 15
WaitBackupStart = WaitActionStart + PITRcheckRange*12/10
WaitBackupStart = WaitActionStart + PITRcheckRange*12/10 // 33 seconds
)

// OpLog represents log of started operation.
Expand Down
14 changes: 7 additions & 7 deletions pbm/pitr/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (e ErrOpMoved) Error() string {
const LogStartMsg = "start_ok"

// Stream streaming (saving) chunks of the oplog to the given storage
func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compression compress.CompressionType, level *int) error {
func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compression compress.CompressionType, level *int, timeouts *pbm.BackupTimeouts) error {
if s.lastTS.T == 0 {
return errors.New("no starting point defined")
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compres
s.l.Info("got wake_up signal")
if bcp != nil {
s.l.Info("wake_up for bcp %s", bcp.String())
sliceTo, err = s.backupStartTS(bcp.String())
sliceTo, err = s.backupStartTS(bcp.String(), timeouts.StartingStatus())
if err != nil {
return errors.Wrap(err, "get backup start TS")
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compres
// and a new worker was elected;
// - any other case (including no lock) is the undefined behaviour - return.
//
ld, err := s.getOpLock(llock)
ld, err := s.getOpLock(llock, timeouts.StartingStatus())
if err != nil {
return errors.Wrap(err, "check lock")
}
Expand Down Expand Up @@ -441,10 +441,10 @@ func formatts(t primitive.Timestamp) string {
return time.Unix(int64(t.T), 0).UTC().Format("2006-01-02T15:04:05")
}

func (s *Slicer) getOpLock(l *pbm.LockHeader) (ld pbm.LockData, err error) {
func (s *Slicer) getOpLock(l *pbm.LockHeader, t time.Duration) (ld pbm.LockData, err error) {
tk := time.NewTicker(time.Second)
defer tk.Stop()
for j := 0; j < int(pbm.WaitBackupStart.Seconds()); j++ {
for j := 0; j < int(t.Seconds()); j++ {
ld, err = s.pbm.GetLockData(l)
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return ld, errors.Wrap(err, "get")
Expand All @@ -458,10 +458,10 @@ func (s *Slicer) getOpLock(l *pbm.LockHeader) (ld pbm.LockData, err error) {
return ld, nil
}

func (s *Slicer) backupStartTS(opid string) (ts primitive.Timestamp, err error) {
func (s *Slicer) backupStartTS(opid string, t time.Duration) (ts primitive.Timestamp, err error) {
tk := time.NewTicker(time.Second)
defer tk.Stop()
for j := 0; j < int(pbm.WaitBackupStart.Seconds()); j++ {
for j := 0; j < int(t.Seconds()); j++ {
b, err := s.pbm.GetBackupByOPID(opid)
if err != nil && err != pbm.ErrNotFound {
return ts, errors.Wrap(err, "get backup meta")
Expand Down
16 changes: 16 additions & 0 deletions pbm/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ type Storage interface {
// Copy makes a copy of the src objec/file under dst name
Copy(src, dst string) error
}

// ParseType parses string and returns storage type
func ParseType(s string) Type {
switch s {
case string(S3):
return S3
case string(Azure):
return Azure
case string(Filesystem):
return Filesystem
case string(BlackHole):
return BlackHole
default:
return Undef
}
}

0 comments on commit 152dc02

Please sign in to comment.