Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PBM-1154] add backup.timeouts.startingStatus #863

Merged
merged 4 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/percona/percona-backup-mongodb/pbm"
"github.com/percona/percona-backup-mongodb/pbm/backup"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/storage"
)

type currentBackup struct {
Expand Down Expand Up @@ -96,6 +97,17 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
bcp = backup.New(a.pbm, a.node)
}

cfg, err := a.pbm.GetConfig()
if err != nil {
l.Error("unable to get PBM config settings: " + err.Error())
return
}
if storage.ParseType(string(cfg.Storage.Type)) == storage.Undef {
l.Error("backups cannot be saved because PBM storage configuration hasn't been set yet")
return
}
bcp.SetTimeouts(cfg.Backup.Timeouts)

if nodeInfo.IsClusterLeader() {
balancer := pbm.BalancerModeOff
if nodeInfo.IsSharded() {
Expand All @@ -108,7 +120,7 @@ func (a *Agent) Backup(cmd *pbm.BackupCmd, opid pbm.OPID, ep pbm.Epoch) {
balancer = pbm.BalancerModeOn
}
}
err = bcp.Init(cmd, opid, nodeInfo, balancer)
err = bcp.Init(cmd, opid, nodeInfo, cfg.Storage, balancer)
if err != nil {
l.Error("init meta: %v", err)
return
Expand Down
2 changes: 1 addition & 1 deletion agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (a *Agent) pitr() (err error) {
w: w,
})

streamErr := ibcp.Stream(ctx, w, cfg.PITR.Compression, cfg.PITR.CompressionLevel)
streamErr := ibcp.Stream(ctx, w, cfg.PITR.Compression, cfg.PITR.CompressionLevel, cfg.Backup.Timeouts)
if streamErr != nil {
switch streamErr.(type) {
case pitr.ErrOpMoved:
Expand Down
2 changes: 1 addition & 1 deletion cli/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func runBackup(cn *pbm.PBM, b *backupOpts, outf outFormat) (fmt.Stringer, error)
}

fmt.Printf("Starting backup '%s'", b.name)
ctx, cancel := context.WithTimeout(context.Background(), pbm.WaitBackupStart)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Backup.Timeouts.StartingStatus())
defer cancel()
err = waitForBcpStatus(ctx, cn, b.name)
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions pbm/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Backup struct {
node *pbm.Node
typ pbm.BackupType
incrBase bool
timeouts *pbm.BackupTimeouts
}

func New(cn *pbm.PBM, node *pbm.Node) *Backup {
Expand Down Expand Up @@ -67,7 +68,11 @@ func NewIncremental(cn *pbm.PBM, node *pbm.Node, base bool) *Backup {
}
}

func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, balancer pbm.BalancerMode) error {
func (b *Backup) SetTimeouts(t *pbm.BackupTimeouts) {
b.timeouts = t
}

func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, store pbm.StorageConf, balancer pbm.BalancerMode) error {
ts, err := b.cn.ClusterTime()
if err != nil {
return errors.Wrap(err, "read cluster time")
Expand All @@ -79,6 +84,7 @@ func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, bala
Name: bcp.Name,
Namespaces: bcp.Namespaces,
Compression: bcp.Compression,
Store: store,
StartTS: time.Now().Unix(),
Status: pbm.StatusStarting,
Replsets: []pbm.BackupReplset{},
Expand All @@ -90,14 +96,6 @@ func (b *Backup) Init(bcp *pbm.BackupCmd, opid pbm.OPID, inf *pbm.NodeInfo, bala
Hb: ts,
}

cfg, err := b.cn.GetConfig()
if err == pbm.ErrStorageUndefined {
return errors.New("backups cannot be saved because PBM storage configuration hasn't been set yet")
} else if err != nil {
return errors.Wrap(err, "unable to get PBM config settings")
}
meta.Store = cfg.Storage

ver, err := b.node.GetMongoVersion()
if err != nil {
return errors.WithMessage(err, "get mongo version")
Expand Down Expand Up @@ -232,7 +230,7 @@ func (b *Backup) Run(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OPID, l *

// Waiting for StatusStarting to move further.
// In case some preparations has to be done before backup.
err = b.waitForStatus(bcp.Name, pbm.StatusStarting, &pbm.WaitBackupStart)
err = b.waitForStatus(bcp.Name, pbm.StatusStarting, ref(b.timeouts.StartingStatus()))
if err != nil {
return errors.Wrap(err, "waiting for start")
}
Expand Down Expand Up @@ -682,3 +680,7 @@ func (b *Backup) setClusterLastWrite(bcpName string) error {
err = b.cn.SetLastWrite(bcpName, lw)
return errors.Wrap(err, "set timestamp")
}

func ref[T any](v T) *T {
return &v
}
2 changes: 1 addition & 1 deletion pbm/backup/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (b *Backup) doLogical(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OPI
}

if inf.IsLeader() {
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, &pbm.WaitBackupStart)
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, ref(b.timeouts.StartingStatus()))
if err != nil {
if errors.Cause(err) == errConvergeTimeOut {
return errors.Wrap(err, "couldn't get response from all shards")
Expand Down
2 changes: 1 addition & 1 deletion pbm/backup/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (b *Backup) doPhysical(ctx context.Context, bcp *pbm.BackupCmd, opid pbm.OP
}

if inf.IsLeader() {
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, &pbm.WaitBackupStart)
err := b.reconcileStatus(bcp.Name, opid.String(), pbm.StatusRunning, ref(b.timeouts.StartingStatus()))
if err != nil {
if errors.Cause(err) == errConvergeTimeOut {
return errors.Wrap(err, "couldn't get response from all shards")
Expand Down
28 changes: 26 additions & 2 deletions pbm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,26 @@

type BackupConf struct {
Priority map[string]float64 `bson:"priority,omitempty" json:"priority,omitempty" yaml:"priority,omitempty"`
Timeouts *BackupTimeouts `bson:"timeouts,omitempty" json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
Compression compress.CompressionType `bson:"compression,omitempty" json:"compression,omitempty" yaml:"compression,omitempty"`
CompressionLevel *int `bson:"compressionLevel,omitempty" json:"compressionLevel,omitempty" yaml:"compressionLevel,omitempty"`
}

type BackupTimeouts struct {
// Starting is timeout (in seconds) to wait for a backup to start.
Starting *uint32 `bson:"startingStatus,omitempty" json:"startingStatus,omitempty" yaml:"startingStatus,omitempty"`
}

// StartingStatus returns timeout duration for .
// If not set or zero, returns default value (WaitBackupStart).
func (t *BackupTimeouts) StartingStatus() time.Duration {
if t == nil || t.Starting == nil || *t.Starting == 0 {
return WaitBackupStart
}

return time.Duration(*t.Starting) * time.Second
}

type confMap map[string]reflect.Kind

// _confmap is a list of config's valid keys and its types
Expand Down Expand Up @@ -222,7 +238,7 @@

// TODO: if store or pitr changed - need to bump epoch
// TODO: struct tags to config opts `pbm:"resync,epoch"`?
p.GetConfig()

Check failure on line 241 in pbm/config.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

Error return value of `p.GetConfig` is not checked (errcheck)

_, err = p.Conn.Database(DB).Collection(ConfigCollection).UpdateOne(
p.ctx,
Expand Down Expand Up @@ -251,9 +267,17 @@
switch _confmap[key] {
case reflect.String:
v = val
case reflect.Int, reflect.Int64:
case reflect.Uint, reflect.Uint32:
v, err = strconv.ParseUint(val, 10, 32)
case reflect.Uint64:
v, err = strconv.ParseUint(val, 10, 64)
case reflect.Int, reflect.Int32:
v, err = strconv.ParseInt(val, 10, 32)
case reflect.Int64:
v, err = strconv.ParseInt(val, 10, 64)
case reflect.Float32, reflect.Float64:
case reflect.Float32:
v, err = strconv.ParseFloat(val, 32)
case reflect.Float64:
v, err = strconv.ParseFloat(val, 64)
case reflect.Bool:
v, err = strconv.ParseBool(val)
Expand Down
3 changes: 1 addition & 2 deletions pbm/pbm.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (r RestoreCmd) String() string {
}

return fmt.Sprintf("name: %s, %s", r.Name, bcp)

}

type ReplayCmd struct {
Expand Down Expand Up @@ -237,7 +236,7 @@ const (

var (
WaitActionStart = time.Second * 15
WaitBackupStart = WaitActionStart + PITRcheckRange*12/10
WaitBackupStart = WaitActionStart + PITRcheckRange*12/10 // 33 seconds
)

// OpLog represents log of started operation.
Expand Down
14 changes: 7 additions & 7 deletions pbm/pitr/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (e ErrOpMoved) Error() string {
const LogStartMsg = "start_ok"

// Stream streaming (saving) chunks of the oplog to the given storage
func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compression compress.CompressionType, level *int) error {
func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compression compress.CompressionType, level *int, timeouts *pbm.BackupTimeouts) error {
if s.lastTS.T == 0 {
return errors.New("no starting point defined")
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compres
s.l.Info("got wake_up signal")
if bcp != nil {
s.l.Info("wake_up for bcp %s", bcp.String())
sliceTo, err = s.backupStartTS(bcp.String())
sliceTo, err = s.backupStartTS(bcp.String(), timeouts.StartingStatus())
if err != nil {
return errors.Wrap(err, "get backup start TS")
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func (s *Slicer) Stream(ctx context.Context, backupSig <-chan *pbm.OPID, compres
// and a new worker was elected;
// - any other case (including no lock) is the undefined behaviour - return.
//
ld, err := s.getOpLock(llock)
ld, err := s.getOpLock(llock, timeouts.StartingStatus())
if err != nil {
return errors.Wrap(err, "check lock")
}
Expand Down Expand Up @@ -441,10 +441,10 @@ func formatts(t primitive.Timestamp) string {
return time.Unix(int64(t.T), 0).UTC().Format("2006-01-02T15:04:05")
}

func (s *Slicer) getOpLock(l *pbm.LockHeader) (ld pbm.LockData, err error) {
func (s *Slicer) getOpLock(l *pbm.LockHeader, t time.Duration) (ld pbm.LockData, err error) {
tk := time.NewTicker(time.Second)
defer tk.Stop()
for j := 0; j < int(pbm.WaitBackupStart.Seconds()); j++ {
for j := 0; j < int(t.Seconds()); j++ {
ld, err = s.pbm.GetLockData(l)
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return ld, errors.Wrap(err, "get")
Expand All @@ -458,10 +458,10 @@ func (s *Slicer) getOpLock(l *pbm.LockHeader) (ld pbm.LockData, err error) {
return ld, nil
}

func (s *Slicer) backupStartTS(opid string) (ts primitive.Timestamp, err error) {
func (s *Slicer) backupStartTS(opid string, t time.Duration) (ts primitive.Timestamp, err error) {
tk := time.NewTicker(time.Second)
defer tk.Stop()
for j := 0; j < int(pbm.WaitBackupStart.Seconds()); j++ {
for j := 0; j < int(t.Seconds()); j++ {
b, err := s.pbm.GetBackupByOPID(opid)
if err != nil && err != pbm.ErrNotFound {
return ts, errors.Wrap(err, "get backup meta")
Expand Down
16 changes: 16 additions & 0 deletions pbm/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ type Storage interface {
// Copy makes a copy of the src objec/file under dst name
Copy(src, dst string) error
}

// ParseType parses string and returns storage type
func ParseType(s string) Type {
switch s {
case string(S3):
return S3
case string(Azure):
return Azure
case string(Filesystem):
return Filesystem
case string(BlackHole):
return BlackHole
default:
return Undef
}
}
Loading