diff --git a/agent/agent.go b/agent/agent.go index cb40cf6d8..7b5387b6c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -109,8 +109,6 @@ func (a *Agent) Start() error { a.OplogReplay(cmd.Replay, cmd.OPID, ep) case pbm.CmdResync: a.Resync(cmd.OPID, ep) - case pbm.CmdPITRestore: - a.PITRestore(cmd.PITRestore, cmd.OPID, ep) case pbm.CmdDeleteBackup: a.Delete(cmd.Delete, cmd.OPID, ep) case pbm.CmdDeletePITR: @@ -447,7 +445,7 @@ func (a *Agent) acquireLock(l *pbm.Lock, lg *log.Event, acquireFn lockAquireFn) switch lk.Type { case pbm.CmdBackup: fn = a.pbm.MarkBcpStale - case pbm.CmdRestore, pbm.CmdPITRestore: + case pbm.CmdRestore: fn = a.pbm.MarkRestoreStale default: return acquireFn() diff --git a/agent/snapshot.go b/agent/backup.go similarity index 60% rename from agent/snapshot.go rename to agent/backup.go index de5b99c4f..f5e4fbc16 100644 --- a/agent/snapshot.go +++ b/agent/backup.go @@ -9,8 +9,6 @@ import ( "github.com/percona/percona-backup-mongodb/pbm" "github.com/percona/percona-backup-mongodb/pbm/backup" "github.com/percona/percona-backup-mongodb/pbm/log" - "github.com/percona/percona-backup-mongodb/pbm/restore" - "github.com/percona/percona-backup-mongodb/pbm/storage" ) type currentBackup struct { @@ -285,171 +283,3 @@ func (a *Agent) waitNomination(bcp, rs, node string, l *log.Event) (got bool, er } } } - -func (a *Agent) Restore(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) { - if r == nil { - l := a.log.NewEvent(string(pbm.CmdRestore), "", opid.String(), ep.TS()) - l.Error("missed command") - return - } - - l := a.log.NewEvent(string(pbm.CmdRestore), r.Name, opid.String(), ep.TS()) - - var bcpType pbm.BackupType - - if r.External && r.BackupName == "" { - bcpType = pbm.ExternalBackup - } else { - l.Info("backup: %s", r.BackupName) - var stg storage.Storage - bcp, err := a.pbm.GetBackupMeta(r.BackupName) - if errors.Is(err, pbm.ErrNotFound) { - stg, err = a.pbm.GetStorage(l) - if err != nil { - l.Error("get storage: %v", err) - return - } - - bcp, err = restore.GetMetaFromStore(stg, r.BackupName) - } - if err != nil { - l.Error("get backup metadata: %v", err) - return - } - bcpType = bcp.Type - } - var err error - switch bcpType { - case pbm.PhysicalBackup, pbm.IncrementalBackup, pbm.ExternalBackup: - err = a.restorePhysical(r, opid, ep, l) - case pbm.LogicalBackup: - fallthrough - default: - err = a.restoreLogical(r, opid, ep, l) - } - - if err != nil { - l.Error("%v", err) - return - } -} - -// restoreLogical starts the restore -func (a *Agent) restoreLogical(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch, l *log.Event) error { - nodeInfo, err := a.node.GetInfo() - if err != nil { - return errors.Wrap(err, "get node info") - } - if !nodeInfo.IsPrimary { - return errors.New("node is not primary so it's unsuitable to do restore") - } - - epts := ep.TS() - lock := a.pbm.NewLock(pbm.LockHeader{ - Type: pbm.CmdRestore, - Replset: nodeInfo.SetName, - Node: nodeInfo.Me, - OPID: opid.String(), - Epoch: &epts, - }) - - got, err := a.acquireLock(lock, l, nil) - if err != nil { - return errors.Wrap(err, "acquiring lock") - } - if !got { - l.Debug("skip: lock not acquired") - return errors.New("unbale to run the restore while another operation running") - } - - defer func() { - l.Debug("releasing lock") - err := lock.Release() - if err != nil { - l.Error("release lock: %v", err) - } - }() - - l.Info("restore started") - err = restore.New(a.pbm, a.node, r.RSMap).Snapshot(r, opid, l) - if err != nil { - if errors.Is(err, restore.ErrNoDataForShard) { - l.Info("no data for the shard in backup, skipping") - return nil - } - - return err - } - l.Info("restore finished successfully") - - if nodeInfo.IsLeader() { - epch, err := a.pbm.ResetEpoch() - if err != nil { - return errors.Wrap(err, "reset epoch") - } - - l.Debug("epoch set to %v", epch) - } - - return nil -} - -// restoreLogical starts the restore -func (a *Agent) restorePhysical(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch, l *log.Event) error { - nodeInfo, err := a.node.GetInfo() - if err != nil { - return errors.Wrap(err, "get node info") - } - - rstr, err := restore.NewPhysical(a.pbm, a.node, nodeInfo, r.RSMap) - if err != nil { - return errors.Wrap(err, "init physical backup") - } - - // physical restore runs on all nodes in the replset - // so we try lock only on primary only to be sure there - // is no concurrent operation running. - var lock *pbm.Lock - if nodeInfo.IsPrimary { - epts := ep.TS() - lock = a.pbm.NewLock(pbm.LockHeader{ - Type: pbm.CmdRestore, - Replset: nodeInfo.SetName, - Node: nodeInfo.Me, - OPID: opid.String(), - Epoch: &epts, - }) - - got, err := a.acquireLock(lock, l, nil) - if err != nil { - return errors.Wrap(err, "acquiring lock") - } - if !got { - l.Debug("skip: lock not acquired") - return errors.New("unbale to run the restore while another operation running") - } - } - - if lock != nil { - // Don't care about errors. Anyway, the lock gonna disappear after the - // restore. And the commands stream is down as well. - // The lock also updates its heartbeats but Restore waits only for one state - // with the timeout twice as short pbm.StaleFrameSec. - lock.Release() - } - - l.Info("restore started") - err = rstr.Snapshot(r, opid, l, a.closeCMD, a.HbPause) - l.Info("restore finished %v", err) - if err != nil { - if errors.Is(err, restore.ErrNoDataForShard) { - l.Info("no data for the shard in backup, skipping") - return nil - } - - return err - } - l.Info("restore finished successfully") - - return nil -} diff --git a/agent/pitr.go b/agent/restore.go similarity index 70% rename from agent/pitr.go rename to agent/restore.go index 37779c58a..e2bbd0722 100644 --- a/agent/pitr.go +++ b/agent/restore.go @@ -258,57 +258,116 @@ func (a *Agent) pitrLockCheck() (moveOn bool, err error) { return tl.Heartbeat.T+pbm.StaleFrameSec < ts.T, nil } -// PITRestore starts the point-in-time recovery -func (a *Agent) PITRestore(r *pbm.PITRestoreCmd, opid pbm.OPID, ep pbm.Epoch) { +func (a *Agent) Restore(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) { if r == nil { - l := a.log.NewEvent(string(pbm.CmdPITRestore), "", opid.String(), ep.TS()) + l := a.log.NewEvent(string(pbm.CmdRestore), "", opid.String(), ep.TS()) l.Error("missed command") return } - l := a.log.NewEvent(string(pbm.CmdPITRestore), r.Name, opid.String(), ep.TS()) + l := a.log.NewEvent(string(pbm.CmdRestore), r.Name, opid.String(), ep.TS()) - l.Info("to time: %s", time.Unix(r.TS, 0).UTC().Format(time.RFC3339)) + if !r.OplogTS.IsZero() { + l.Info("to time: %s", time.Unix(int64(r.OplogTS.T), 0).UTC().Format(time.RFC3339)) + } nodeInfo, err := a.node.GetInfo() if err != nil { l.Error("get node info: %v", err) return } - if !nodeInfo.IsPrimary { - l.Info("Node in not suitable for restore") - return - } - epts := ep.TS() - lock := a.pbm.NewLock(pbm.LockHeader{ - Type: pbm.CmdPITRestore, - Replset: nodeInfo.SetName, - Node: nodeInfo.Me, - OPID: opid.String(), - Epoch: &epts, - }) + var lock *pbm.Lock + if nodeInfo.IsPrimary { + epts := ep.TS() + lock = a.pbm.NewLock(pbm.LockHeader{ + Type: pbm.CmdRestore, + Replset: nodeInfo.SetName, + Node: nodeInfo.Me, + OPID: opid.String(), + Epoch: &epts, + }) - got, err := a.acquireLock(lock, l, nil) - if err != nil { - l.Error("acquiring lock: %v", err) - return + got, err := a.acquireLock(lock, l, nil) + if err != nil { + l.Error("acquiring lock: %v", err) + return + } + if !got { + l.Debug("skip: lock not acquired") + l.Error("unable to run the restore while another backup or restore process running") + return + } + + defer func() { + if lock == nil { + return + } + err := lock.Release() + if err != nil { + l.Error("release lock: %v", err) + } + }() } - if !got { - l.Debug("skip: lock not acquired") - l.Error("unable to run the restore while another backup or restore process running") + + stg, err := a.pbm.GetStorage(l) + if err != nil { + l.Error("get storage: %v", err) return } - defer func() { - err := lock.Release() + var bcpType pbm.BackupType + bcp := &pbm.BackupMeta{} + + if r.External && r.BackupName == "" { + bcpType = pbm.ExternalBackup + } else { + l.Info("backup: %s", r.BackupName) + if !r.OplogTS.IsZero() { + bcp, err = restore.GetBaseBackup(a.pbm, r.BackupName, r.OplogTS, stg) + } else { + bcp, err = restore.SnapshotMeta(a.pbm, r.BackupName, stg) + } if err != nil { - l.Error("release lock: %v", err) + l.Error("define base backup: %v", err) + return } - }() + bcpType = bcp.Type + } l.Info("recovery started") - err = restore.New(a.pbm, a.node, r.RSMap).PITR(r, opid, l) + + switch bcpType { + case pbm.LogicalBackup: + if !nodeInfo.IsPrimary { + l.Info("Node in not suitable for restore") + return + } + if r.OplogTS.IsZero() { + err = restore.New(a.pbm, a.node, r.RSMap).Snapshot(r, opid, l) + } else { + err = restore.New(a.pbm, a.node, r.RSMap).PITR(r, opid, l) + } + case pbm.PhysicalBackup, pbm.IncrementalBackup, pbm.ExternalBackup: + if lock != nil { + // Don't care about errors. Anyway, the lock gonna disappear after the + // restore. And the commands stream is down as well. + // The lock also updates its heartbeats but Restore waits only for one state + // with the timeout twice as short pbm.StaleFrameSec. + _ = lock.Release() + lock = nil + } + + var rstr *restore.PhysRestore + rstr, err = restore.NewPhysical(a.pbm, a.node, nodeInfo, r.RSMap) + if err != nil { + l.Error("init physical backup: %v", err) + return + } + + r.BackupName = bcp.Name + err = rstr.Snapshot(r, r.OplogTS, opid, l, a.closeCMD, a.HbPause) + } if err != nil { if errors.Is(err, restore.ErrNoDataForShard) { l.Info("no data for the shard in backup, skipping") @@ -317,15 +376,14 @@ func (a *Agent) PITRestore(r *pbm.PITRestoreCmd, opid pbm.OPID, ep pbm.Epoch) { } return } - l.Info("recovery successfully finished") - if nodeInfo.IsLeader() { + if bcpType == pbm.LogicalBackup && nodeInfo.IsLeader() { epch, err := a.pbm.ResetEpoch() if err != nil { - l.Error("reset epoch") - return + l.Error("reset epoch: %v", err) } - l.Debug("epoch set to %v", epch) } + + l.Info("recovery successfully finished") } diff --git a/cli/cli.go b/cli/cli.go index 6ee65ddeb..3d627cbf9 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -169,7 +169,7 @@ func Main() { logsCmd.Flag("tail", "Show last N entries, 20 entries are shown by default, 0 for all logs").Short('t').Default("20").Int64Var(&logs.tail) logsCmd.Flag("node", "Target node in format replset[/host:posrt]").Short('n').StringVar(&logs.node) logsCmd.Flag("severity", "Severity level D, I, W, E or F, low to high. Choosing one includes higher levels too.").Short('s').Default("I").EnumVar(&logs.severity, "D", "I", "W", "E", "F") - logsCmd.Flag("event", "Event in format backup[/2020-10-06T11:45:14Z]. Events: backup, restore, cancelBackup, resync, pitr, pitrestore, delete").Short('e').StringVar(&logs.event) + logsCmd.Flag("event", "Event in format backup[/2020-10-06T11:45:14Z]. Events: backup, restore, cancelBackup, resync, pitr, delete").Short('e').StringVar(&logs.event) logsCmd.Flag("opid", "Operation ID").Short('i').StringVar(&logs.opid) logsCmd.Flag("timezone", "Timezone of log output. `Local`, `UTC` or a location name corresponding to a file in the IANA Time Zone database, such as `America/New_York`").StringVar(&logs.location) logsCmd.Flag("extra", "Show extra data in text format").Hidden().Short('x').BoolVar(&logs.extr) diff --git a/cli/delete.go b/cli/delete.go index fc675079b..087383c9f 100644 --- a/cli/delete.go +++ b/cli/delete.go @@ -2,7 +2,6 @@ package cli import ( "bufio" - "context" "fmt" "io" "os" @@ -10,10 +9,7 @@ import ( "time" "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" "github.com/percona/percona-backup-mongodb/pbm" ) @@ -347,50 +343,3 @@ func askCleanupConfirmation(info pbm.CleanupInfo) (bool, error) { return false, nil } - -func findAdjustedTS(ctx context.Context, m *mongo.Client, ts primitive.Timestamp, strict bool) (primitive.Timestamp, error) { - if strict { - rt, err := findBaseSnapshotRestoreTS(ctx, m, bson.M{"$gte": ts}, bson.D{{"last_write_ts", 1}}) - if err != nil { - if errors.Is(err, mongo.ErrNoDocuments) { - err = nil - } - return primitive.Timestamp{}, err - } - if rt.Equal(ts) { - return ts, nil - } - } - - rt, err := findBaseSnapshotRestoreTS(ctx, m, bson.M{"$lt": ts}, bson.D{{"last_write_ts", -1}}) - if err != nil { - if errors.Is(err, mongo.ErrNoDocuments) { - return ts, nil - } - return primitive.Timestamp{}, err - } - - return rt, nil -} - -func findBaseSnapshotRestoreTS(ctx context.Context, m *mongo.Client, lastWrite any, sort bson.D) (primitive.Timestamp, error) { - filter := bson.D{ - {"nss", nil}, - {"status", pbm.StatusDone}, - {"type", pbm.LogicalBackup}, - {"last_write_ts", lastWrite}, - } - options := options.FindOne(). - SetProjection(bson.D{{"last_write_ts", 1}}). - SetSort(sort) - cur := m.Database(pbm.DB).Collection(pbm.BcpCollection).FindOne(ctx, filter, options) - if err := cur.Err(); err != nil { - return primitive.Timestamp{}, errors.WithMessage(err, "query") - } - - var r struct { - RestoreTS primitive.Timestamp `bson:"last_write_ts"` - } - err := cur.Decode(&r) - return r.RestoreTS, errors.WithMessage(err, "decode") -} diff --git a/cli/restore.go b/cli/restore.go index 9442c2184..6a33b136a 100644 --- a/cli/restore.go +++ b/cli/restore.go @@ -3,6 +3,7 @@ package cli import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -11,6 +12,7 @@ import ( "strings" "time" + "github.com/mongodb/mongo-tools/common/db" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson/primitive" "gopkg.in/yaml.v2" @@ -48,9 +50,9 @@ func (r restoreRet) HasError() bool { func (r restoreRet) String() string { switch { case r.done: - m := "\nRestore successfully finished!\n" + m := fmt.Sprintf("\nRestore finished! Check pbm describe-restore %s", r.Name) if r.physical { - m += "Restart the cluster and pbm-agents, and run `pbm config --force-resync`" + m += " -c \nRestart the cluster and pbm-agents, and run `pbm config --force-resync`" } return m case r.err != "": @@ -109,65 +111,44 @@ func runRestore(cn *pbm.PBM, o *restoreOpts, outf outFormat) (fmt.Stringer, erro } tdiff := time.Now().Unix() - int64(clusterTime.T) - switch { - case o.bcp != "" || o.extern: - m, err := restore(cn, o, nss, rsMap, outf) + m, err := restore(cn, o, nss, rsMap, outf) + if err != nil { + return nil, err + } + if o.extern && outf == outText { + err = waitRestore(cn, m, pbm.StatusCopyReady, tdiff) if err != nil { - return nil, err - } - if o.extern && outf == outText { - err = waitRestore(cn, m, pbm.StatusCopyReady, tdiff) - if err != nil { - return nil, errors.Wrap(err, "waiting for the `copyReady` status") - } - - return externRestoreRet{Name: m.Name, Snapshot: o.bcp}, nil - } - if !o.wait { - return restoreRet{ - Name: m.Name, - Snapshot: o.bcp, - physical: m.Type == pbm.PhysicalBackup || m.Type == pbm.IncrementalBackup, - }, nil + return nil, errors.Wrap(err, "waiting for the `copyReady` status") } - typ := " logical restore.\nWaiting to finish" - if m.Type == pbm.PhysicalBackup { - typ = " physical restore.\nWaiting to finish" - } - fmt.Printf("Started%s", typ) - err = waitRestore(cn, m, pbm.StatusDone, tdiff) - if err == nil { - return restoreRet{ - done: true, - physical: m.Type == pbm.PhysicalBackup || m.Type == pbm.IncrementalBackup, - }, nil - } + return externRestoreRet{Name: m.Name, Snapshot: o.bcp}, nil + } + if !o.wait { + return restoreRet{ + Name: m.Name, + Snapshot: o.bcp, + physical: m.Type == pbm.PhysicalBackup || m.Type == pbm.IncrementalBackup, + }, nil + } - if serr, ok := err.(errRestoreFailed); ok { - return restoreRet{err: serr.Error()}, nil - } - return restoreRet{err: fmt.Sprintf("%s.\n Try to check logs on node %s", err.Error(), m.Leader)}, nil - case o.pitr != "": - m, err := pitrestore(cn, o.pitr, o.pitrBase, nss, rsMap, outf) - if err != nil { - return nil, err - } - if !o.wait { - return restoreRet{PITR: o.pitr, Name: m.Name}, nil - } - fmt.Print("Started.\nWaiting to finish") - err = waitRestore(cn, m, pbm.StatusDone, tdiff) - if err != nil { - return restoreRet{err: err.Error()}, nil - } + typ := " logical restore.\nWaiting to finish" + if m.Type == pbm.PhysicalBackup { + typ = " physical restore.\nWaiting to finish" + } + fmt.Printf("Started%s", typ) + err = waitRestore(cn, m, pbm.StatusDone, tdiff) + if err == nil { return restoreRet{ - done: true, - PITR: o.pitr, + Name: m.Name, + done: true, + physical: m.Type == pbm.PhysicalBackup || m.Type == pbm.IncrementalBackup, }, nil - default: - return nil, errors.New("undefined restore state") } + + if serr, ok := err.(errRestoreFailed); ok { + return restoreRet{err: serr.Error()}, nil + } + return restoreRet{err: fmt.Sprintf("%s.\n Try to check logs on node %s", err.Error(), m.Leader)}, nil } // We rely on heartbeats in error detection in case of all nodes failed, @@ -243,30 +224,39 @@ func (e errRestoreFailed) Error() string { return e.string } -func checkBackup(cn *pbm.PBM, o *restoreOpts, nss []string) (pbm.BackupType, error) { +func checkBackup(cn *pbm.PBM, o *restoreOpts, nss []string) (name string, typ pbm.BackupType, err error) { if o.extern && o.bcp == "" { - return pbm.ExternalBackup, nil + return "", pbm.ExternalBackup, nil + } + + if o.pitr != "" && o.pitrBase == "" { + return "", pbm.LogicalBackup, nil + } + + b := o.bcp + if o.pitrBase != "" { + b = o.pitrBase } - bcp, err := cn.GetBackupMeta(o.bcp) + bcp, err := cn.GetBackupMeta(b) if errors.Is(err, pbm.ErrNotFound) { - return "", errors.Errorf("backup '%s' not found", o.bcp) + return "", "", errors.Errorf("backup '%s' not found", b) } if err != nil { - return "", errors.Wrap(err, "get backup data") + return "", "", errors.Wrap(err, "get backup data") } if len(nss) != 0 && bcp.Type != pbm.LogicalBackup { - return "", errors.New("--ns flag is only allowed for logical restore") + return "", "", errors.New("--ns flag is only allowed for logical restore") } if bcp.Status != pbm.StatusDone { - return "", errors.Errorf("backup '%s' didn't finish successfully", o.bcp) + return "", "", errors.Errorf("backup '%s' didn't finish successfully", b) } - return bcp.Type, nil + return b, bcp.Type, nil } func restore(cn *pbm.PBM, o *restoreOpts, nss []string, rsMapping map[string]string, outf outFormat) (*pbm.RestoreMeta, error) { - bcpType, err := checkBackup(cn, o, nss) + bcp, bcpType, err := checkBackup(cn, o, nss) if err != nil { return nil, err } @@ -275,15 +265,27 @@ func restore(cn *pbm.PBM, o *restoreOpts, nss []string, rsMapping map[string]str return nil, err } - restore := &pbm.RestoreCmd{ - Name: time.Now().UTC().Format(time.RFC3339Nano), - BackupName: o.bcp, - Namespaces: nss, - RSMap: rsMapping, - External: o.extern, + name := time.Now().UTC().Format(time.RFC3339Nano) + + cmd := pbm.Cmd{ + Cmd: pbm.CmdRestore, + Restore: &pbm.RestoreCmd{ + Name: name, + BackupName: bcp, + Namespaces: nss, + RSMap: rsMapping, + External: o.extern, + }, } + if o.pitr != "" { + cmd.Restore.OplogTS, err = parseTS(o.pitr) + if err != nil { + return nil, err + } + } + if o.ts != "" { - restore.ExtTS, err = parseTS(o.ts) + cmd.Restore.ExtTS, err = parseTS(o.ts) if err != nil { return nil, err } @@ -298,32 +300,37 @@ func restore(cn *pbm.PBM, o *restoreOpts, nss []string, rsMapping map[string]str if err != nil { return nil, errors.Wrap(err, "unable to read config file") } - err = yaml.UnmarshalStrict(buf, &restore.ExtConf) + err = yaml.UnmarshalStrict(buf, &cmd.Restore.ExtConf) if err != nil { return nil, errors.Wrap(err, "unable to unmarshal config file") } } - err = cn.SendCmd(pbm.Cmd{ - Cmd: pbm.CmdRestore, - Restore: restore, - }) + + err = cn.SendCmd(cmd) if err != nil { return nil, errors.Wrap(err, "send command") } if outf != outText { return &pbm.RestoreMeta{ - Name: restore.Name, - Backup: o.bcp, + Name: name, + Backup: bcp, Type: bcpType, }, nil } - bcpName := o.bcp + bcpName := "" + if bcp != "" { + bcpName = fmt.Sprintf(" from '%s'", bcp) + } if o.extern { - bcpName = "[external]" + bcpName = " from [external]" + } + pitrs := "" + if o.pitr != "" { + pitrs = fmt.Sprintf(" to point-in-time %s", o.pitr) } - fmt.Printf("Starting restore %s from '%s'", restore.Name, bcpName) + fmt.Printf("Starting restore %s%s%s", name, pitrs, bcpName) var ( fn getRestoreMetaFn @@ -338,19 +345,19 @@ func restore(cn *pbm.PBM, o *restoreOpts, nss []string, rsMapping map[string]str ctx, cancel = context.WithTimeout(context.Background(), pbm.WaitActionStart) } else { ep, _ := cn.GetEpoch() - stg, err := cn.GetStorage(cn.Logger().NewEvent(string(pbm.CmdRestore), o.bcp, "", ep.TS())) + stg, err := cn.GetStorage(cn.Logger().NewEvent(string(pbm.CmdRestore), bcp, "", ep.TS())) if err != nil { return nil, errors.Wrap(err, "get storage") } fn = func(name string) (*pbm.RestoreMeta, error) { - return pbm.GetPhysRestoreMeta(name, stg, cn.Logger().NewEvent(string(pbm.CmdRestore), o.bcp, "", ep.TS())) + return pbm.GetPhysRestoreMeta(name, stg, cn.Logger().NewEvent(string(pbm.CmdRestore), bcp, "", ep.TS())) } ctx, cancel = context.WithTimeout(context.Background(), waitPhysRestoreStart) } defer cancel() - return waitForRestoreStatus(ctx, cn, restore.Name, fn) + return waitForRestoreStatus(ctx, cn, name, fn) } func runFinishRestore(o descrRestoreOpts) (fmt.Stringer, error) { @@ -389,45 +396,6 @@ func parseTS(t string) (ts primitive.Timestamp, err error) { return primitive.Timestamp{T: uint32(tsto.Unix()), I: 0}, nil } -func pitrestore(cn *pbm.PBM, t, base string, nss []string, rsMap map[string]string, outf outFormat) (rmeta *pbm.RestoreMeta, err error) { - ts, err := parseTS(t) - if err != nil { - return nil, err - } - - err = checkConcurrentOp(cn) - if err != nil { - return nil, err - } - - name := time.Now().UTC().Format(time.RFC3339Nano) - err = cn.SendCmd(pbm.Cmd{ - Cmd: pbm.CmdPITRestore, - PITRestore: &pbm.PITRestoreCmd{ - Name: name, - TS: int64(ts.T), - I: int64(ts.I), - Bcp: base, - Namespaces: nss, - RSMap: rsMap, - }, - }) - if err != nil { - return nil, errors.Wrap(err, "send command") - } - - if outf != outText { - return &pbm.RestoreMeta{Name: name}, nil - } - - fmt.Printf("Starting restore to the point in time '%s'", t) - - ctx, cancel := context.WithTimeout(context.Background(), pbm.WaitActionStart) - defer cancel() - - return waitForRestoreStatus(ctx, cn, name, cn.GetRestoreMeta) -} - type getRestoreMetaFn func(name string) (*pbm.RestoreMeta, error) func waitForRestoreStatus(ctx context.Context, cn *pbm.PBM, name string, getfn getRestoreMetaFn) (*pbm.RestoreMeta, error) { @@ -506,10 +474,12 @@ type describeRestoreResult struct { type RestoreReplset struct { Name string `json:"name" yaml:"name"` Status pbm.Status `json:"status" yaml:"status"` - Error *string `json:"error,omitempty" yaml:"error,omitempty"` + PartialTxn []db.Oplog `json:"partial_txn,omitempty" yaml:"-"` + PartialTxnStr *string `json:"-" yaml:"partial_txn,omitempty"` LastTransitionTS int64 `json:"last_transition_ts" yaml:"-"` LastTransitionTime string `json:"last_transition_time" yaml:"last_transition_time"` Nodes []RestoreNode `json:"nodes,omitempty" yaml:"nodes,omitempty"` + Error *string `json:"error,omitempty" yaml:"error,omitempty"` } type RestoreNode struct { @@ -598,10 +568,20 @@ func describeRestore(cn *pbm.PBM, o descrRestoreOpts) (fmt.Stringer, error) { Name: rs.Name, Status: rs.Status, LastTransitionTS: rs.LastTransitionTS, + PartialTxn: rs.PartialTxn, LastTransitionTime: time.Unix(rs.LastTransitionTS, 0).UTC().Format(time.RFC3339), } if rs.Status == pbm.StatusError { mrs.Error = &rs.Error + } else if len(mrs.PartialTxn) > 0 { + b, err := json.Marshal(mrs.PartialTxn) + if err != nil { + return res, errors.Wrap(err, "marshal partially committed transactions") + } + str := string(b) + mrs.PartialTxnStr = &str + perr := "WARNING! Some distributed transactions were not full in the oplog for this shard. But were applied on other shard(s). See the list of not applied ops in `partial_txn`." + mrs.Error = &perr } for _, node := range rs.Nodes { mnode := RestoreNode{ diff --git a/cli/status.go b/cli/status.go index ff1c70249..9607b6f64 100644 --- a/cli/status.go +++ b/cli/status.go @@ -435,7 +435,7 @@ func (c currOp) String() string { switch c.Type { default: return fmt.Sprintf("%s [op id: %s]", c.Type, c.OPID) - case pbm.CmdBackup, pbm.CmdRestore, pbm.CmdPITRestore: + case pbm.CmdBackup, pbm.CmdRestore: return fmt.Sprintf("%s \"%s\", started at %s. Status: %s. [op id: %s]", c.Type, c.Name, time.Unix((c.StartTS), 0).UTC().Format("2006-01-02T15:04:05Z"), c.Status, c.OPID, @@ -486,7 +486,7 @@ func getCurrOps(cn *pbm.PBM) (fmt.Stringer, error) { case pbm.StatusDumpDone: r.Status = "oplog backup" } - case pbm.CmdRestore, pbm.CmdPITRestore: + case pbm.CmdRestore: rst, err := cn.GetRestoreMetaByOPID(r.OPID) if err != nil { return r, errors.Wrap(err, "get restore info") diff --git a/e2e-tests/pkg/tests/sharded/trx.go b/e2e-tests/pkg/tests/sharded/trx.go index fce998543..67e4df698 100644 --- a/e2e-tests/pkg/tests/sharded/trx.go +++ b/e2e-tests/pkg/tests/sharded/trx.go @@ -354,7 +354,7 @@ func (c *Cluster) checkTrxCollection(ctx context.Context, col string, bcp Backup bcp.Restore() - log.Println("check commited transaction") + log.Println("check committed transaction") c.checkTrxDoc(ctx, col, 30, 1) c.checkTrxDoc(ctx, col, 530, 1) c.checkTrxDoc(ctx, col, 130, 1) @@ -366,7 +366,7 @@ func (c *Cluster) checkTrxCollection(ctx context.Context, col string, bcp Backup c.checkTrxDoc(ctx, col, 3000, 1) c.checkTrxDoc(ctx, col, 3001, 1) - log.Println("check uncommited (commit wasn't dropped to backup) transaction") + log.Println("check uncommitted (commit wasn't dropped to backup) transaction") c.checkTrxDoc(ctx, col, 0, -1) c.checkTrxDoc(ctx, col, 89, -1) c.checkTrxDoc(ctx, col, 99, -1) diff --git a/pbm/oplog/restore.go b/pbm/oplog/restore.go index 90a8153b1..1df47e9d2 100644 --- a/pbm/oplog/restore.go +++ b/pbm/oplog/restore.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" "github.com/percona/percona-backup-mongodb/pbm" "github.com/percona/percona-backup-mongodb/pbm/snapshot" @@ -87,9 +88,8 @@ var dontPreserveUUID = []string{ // OplogRestore is the oplog applyer type OplogRestore struct { - dst *pbm.Node + dst *mongo.Client ver *db.Version - txnBuffer *txn.Buffer needIdxWorkaround bool preserveUUIDopt bool startTS primitive.Timestamp @@ -99,6 +99,11 @@ type OplogRestore struct { includeNS map[string]map[string]bool noUUIDns *ns.Matcher + // dist txn prepare entities yet to be committed + txnData map[string]Txn + // the queue of last N committed transactions + txnCommit *cqueue + txn chan pbm.RestoreTxn txnSyncErr chan error // The `T` part of the last applied op's Timestamp. @@ -115,8 +120,10 @@ type OplogRestore struct { filter OpFilter } +const saveLastDistTxns = 100 + // NewOplogRestore creates an object for an oplog applying -func NewOplogRestore(dst *pbm.Node, ic *idx.IndexCatalog, sv *pbm.MongoVersion, unsafe, preserveUUID bool, ctxn chan pbm.RestoreTxn, txnErr chan error) (*OplogRestore, error) { +func NewOplogRestore(dst *mongo.Client, ic *idx.IndexCatalog, sv *pbm.MongoVersion, unsafe, preserveUUID bool, ctxn chan pbm.RestoreTxn, txnErr chan error) (*OplogRestore, error) { m, err := ns.NewMatcher(append(snapshot.ExcludeFromRestore, excludeFromOplog...)) if err != nil { return nil, errors.Wrap(err, "create matcher for the collections exclude") @@ -132,6 +139,9 @@ func NewOplogRestore(dst *pbm.Node, ic *idx.IndexCatalog, sv *pbm.MongoVersion, v = append(v, 0) } } + if ic == nil { + ic = idx.NewIndexCatalog() + } ver := &db.Version{v[0], v[1], v[2]} return &OplogRestore{ dst: dst, @@ -146,6 +156,8 @@ func NewOplogRestore(dst *pbm.Node, ic *idx.IndexCatalog, sv *pbm.MongoVersion, txnSyncErr: txnErr, unsafe: unsafe, filter: DefaultOpFilter, + txnData: make(map[string]Txn), + txnCommit: newCQueue(saveLastDistTxns), }, nil } @@ -172,9 +184,6 @@ func (o *OplogRestore) Apply(src io.ReadCloser) (lts primitive.Timestamp, err er bsonSource := db.NewDecodedBSONSource(db.NewBufferlessBSONSource(src)) defer bsonSource.Close() - o.txnBuffer = txn.NewBuffer() - defer func() { o.txnBuffer.Stop() }() // it basically never returns an error - for { rawOplogEntry := bsonSource.LoadNext() if rawOplogEntry == nil { @@ -336,9 +345,19 @@ func (o *OplogRestore) handleOp(oe db.Oplog) error { return nil } -func isPrepareTxn(op *db.Oplog) bool { +func isTxnOps(op *db.Oplog) bool { + for _, v := range op.Object { + if v.Key == "applyOps" { + return true + } + } + + return false +} + +func isPartial(op *db.Oplog) bool { for _, v := range op.Object { - if v.Key == "prepare" { + if v.Key == "partialTxn" { return true } } @@ -346,6 +365,13 @@ func isPrepareTxn(op *db.Oplog) bool { return false } +type Txn struct { + Oplog []db.Oplog + meta txn.Meta + applyOps []db.Oplog + allOps bool +} + // handleTxnOp accumulates transaction's ops in a buffer and then applies // it as non-txn ops when observes commit or just purge the buffer if the transaction // aborted. @@ -362,33 +388,30 @@ func isPrepareTxn(op *db.Oplog) bool { // "ts" : Timestamp(1644410656, 8), ... } // // Since we sync backup/restore across shards by `ts` (`opTime`), artifacts of such transaction +// would be visible on shard `rs2` and won't appear on `rs1` given the restore time is `(1644410656, 8)`. // -// would be visible on shard `rs2` and won't appear on `rs1` given the restore time is `(1644410656, 8)`. -// -// To avoid that we have to check if a distributed transaction was committed on all -// participated shards before committing such transaction. -// - Encountering `prepare` statement `handleTxnOp` would send such txn to the caller. -// - Bumping into `commitTransaction` it will send txn again with respective state and -// commit time. And waits for the response from the caller with either "commit" or "abort" state. -// - On "abort" transactions buffer will be purged, hence all ops are discarded. +// We treat distributed transactions as non-distributed - apply ops once +// a commit message for this txn is encountered. But store uncommitted dist txns +// so applier check in the end if any of them committed on other shards +// (and commit if so) func (o *OplogRestore) handleTxnOp(meta txn.Meta, op db.Oplog) error { - err := o.txnBuffer.AddOp(meta, op) - if err != nil { - return errors.Wrap(err, "buffering entry") - } + txnID := fmt.Sprintf("%s-%d", base64.RawStdEncoding.EncodeToString([]byte(op.LSID)), *op.TxnNumber) - if o.txn != nil && isPrepareTxn(&op) { - o.txn <- pbm.RestoreTxn{ - ID: fmt.Sprintf("%s-%d", base64.RawStdEncoding.EncodeToString([]byte(op.LSID)), *op.TxnNumber), - State: pbm.TxnPrepare, + if isTxnOps(&op) { + ops, err := txnInnerOps(&op) + if err != nil { + return err } - } + t := o.txnData[txnID] + t.meta = meta + t.allOps = !isPartial(&op) + t.Oplog = append(t.Oplog, op) + t.applyOps = append(t.applyOps, ops...) + o.txnData[txnID] = t + } if meta.IsAbort() { - err := o.txnBuffer.PurgeTxn(meta) - if err != nil { - return errors.Wrap(err, "cleaning up transaction buffer on abort") - } + delete(o.txnData, txnID) return nil } @@ -396,67 +419,174 @@ func (o *OplogRestore) handleTxnOp(meta txn.Meta, op db.Oplog) error { return nil } - // if the "commit" contains data, it's a non distributed transaction - // and we can apply it now. Otherwise we have to confirm it was committed - // on all participated shards. - if o.txn != nil && !meta.IsData() { + // The first op of the current chunk euquals to the last of the previous. + // It's done to ensure no gaps in between chunks. Although and oplog ops are + // idempotent, if the duplication gonna be a commit message of the distributed + // txn, the second commit gonna fail since we're clearing committed tnxs out + // of the buffer. So just skip if it is a commit duplication. + lastc := o.txnCommit.last() + if lastc != nil && txnID == lastc.ID { + return nil + } + + // if the "commit" contains no data, it's a distributed transaction and we + // preserve it and communicate later to another shards so they can apply + // prepared txn if its commit didn't get into the oplog time range + if !meta.IsData() { var cts primitive.Timestamp for _, v := range op.Object { if v.Key == "commitTimestamp" { cts = v.Value.(primitive.Timestamp) } } - id := fmt.Sprintf("%s-%d", base64.RawStdEncoding.EncodeToString([]byte(op.LSID)), *op.TxnNumber) - o.txn <- pbm.RestoreTxn{ - ID: id, + + o.txnCommit.push(pbm.RestoreTxn{ + ID: txnID, Ctime: cts, State: pbm.TxnCommit, - } + }) + } - select { - case txn := <-o.txn: - if txn.State == pbm.TxnAbort { - err := o.txnBuffer.PurgeTxn(meta) - if err != nil { - return errors.Wrapf(err, "cleaning up txn [%s] buffer on abort", id) - } - return nil - } - case err := <-o.txnSyncErr: - return errors.Wrapf(err, "distributed txn [%s] sync failed with", id) + // commit transaction + err := o.applyTxn(txnID) + if err != nil { + b, _ := json.MarshalIndent(op, "", " ") + return errors.Wrapf(err, "apply txn: %s", b) + } + + delete(o.txnData, txnID) + + return nil +} + +const extractErrorFmt = "error extracting transaction ops: %s: %v" + +func txnInnerOps(txnOp *db.Oplog) ([]db.Oplog, error) { + doc := txnOp.Object + rawAO, err := bsonutil.FindValueByKey("applyOps", &doc) + if err != nil { + return nil, fmt.Errorf(extractErrorFmt, "applyOps field", err) + } + + ao, ok := rawAO.(bson.A) + if !ok { + return nil, fmt.Errorf(extractErrorFmt, "applyOps field", "not a BSON array") + } + + ops := make([]db.Oplog, len(ao)) + for i, v := range ao { + opDoc, ok := v.(bson.D) + if !ok { + return nil, fmt.Errorf(extractErrorFmt, "applyOps op", "not a BSON document") } + op, err := bsonDocToOplog(opDoc) + if err != nil { + return nil, fmt.Errorf(extractErrorFmt, "applyOps op", err) + } + + // The inner ops doesn't have these fields, + // so we are assigning them from the parent transaction op + op.Timestamp = txnOp.Timestamp + op.Term = txnOp.Term + op.Hash = txnOp.Hash + + ops[i] = *op } - // From here, we're applying transaction entries - ops, errs := o.txnBuffer.GetTxnStream(meta) + return ops, nil +} + +const opConvertErrorFmt = "error converting bson.D to op: %s: %v" -Loop: - for { - select { - case op, ok := <-ops: +func bsonDocToOplog(doc bson.D) (*db.Oplog, error) { + op := db.Oplog{} + + for _, v := range doc { + switch v.Key { + case "op": + s, ok := v.Value.(string) if !ok { - break Loop + return nil, fmt.Errorf(opConvertErrorFmt, "op field", "not a string") } - err = o.handleNonTxnOp(op) - if err != nil { - return errors.Wrap(err, "applying transaction op") + op.Operation = s + case "ns": + s, ok := v.Value.(string) + if !ok { + return nil, fmt.Errorf(opConvertErrorFmt, "ns field", "not a string") } - case err := <-errs: - if err != nil { - return errors.Wrap(err, "replaying transaction") + op.Namespace = s + case "o": + d, ok := v.Value.(bson.D) + if !ok { + return nil, fmt.Errorf(opConvertErrorFmt, "o field", "not a BSON Document") + } + op.Object = d + case "o2": + d, ok := v.Value.(bson.D) + if !ok { + return nil, fmt.Errorf(opConvertErrorFmt, "o2 field", "not a BSON Document") + } + op.Query = d + case "ui": + u, ok := v.Value.(primitive.Binary) + if !ok { + return nil, fmt.Errorf(opConvertErrorFmt, "ui field", "not binary data") } - break Loop + op.UI = &u } } - err = o.txnBuffer.PurgeTxn(meta) - if err != nil { - return errors.Wrap(err, "cleaning up transaction buffer") + return &op, nil +} + +func (o *OplogRestore) applyTxn(id string) (err error) { + t, ok := o.txnData[id] + if !ok { + return errors.Errorf("unknown transaction id %s", id) + } + + for _, op := range t.applyOps { + err = o.handleNonTxnOp(op) + if err != nil { + return errors.Wrap(err, "applying transaction op") + } } + delete(o.txnData, id) return nil } +func (o *OplogRestore) TxnLeftovers() (uncommitted map[string]Txn, lastCommits []pbm.RestoreTxn) { + return o.txnData, o.txnCommit.s +} + +func (o *OplogRestore) HandleUncommittedTxn(commits map[string]primitive.Timestamp) (partial, uncommitted []Txn, err error) { + if len(o.txnData) == 0 { + return nil, nil, nil + } + + for id, t := range o.txnData { + if _, ok := commits[id]; ok { + if !t.allOps { + partial = append(partial, t) + continue + } + + err := o.applyTxn(id) + if err != nil { + return partial, uncommitted, errors.Wrapf(err, "applying uncommitted txn %s", id) + } + delete(o.txnData, id) + } + } + + for _, t := range o.txnData { + uncommitted = append(uncommitted, t) + } + + return partial, uncommitted, nil +} + func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error { // have to handle it here one more time because before the op gets thru // txnBuffer its namespace is `collection.$cmd` instead of the real one @@ -618,6 +748,31 @@ func (o *OplogRestore) handleNonTxnOp(op db.Oplog) error { return nil } +type cqueue struct { + s []pbm.RestoreTxn + c int +} + +func newCQueue(cap int) *cqueue { + return &cqueue{s: make([]pbm.RestoreTxn, 0, cap), c: cap} +} + +func (c *cqueue) push(v pbm.RestoreTxn) { + if len(c.s) == c.c { + c.s = c.s[1:] + } + + c.s = append(c.s, v) +} + +func (c *cqueue) last() *pbm.RestoreTxn { + if len(c.s) == 0 { + return nil + } + + return &c.s[len(c.s)-1] +} + // extractIndexDocumentFromCommitIndexBuilds extracts the index specs out of "createIndexes" oplog entry and convert to IndexDocument // returns collection name and index spec func extractIndexDocumentFromCreateIndexes(op db.Oplog) (string, *idx.IndexDocument) { @@ -677,7 +832,7 @@ func extractIndexDocumentFromCommitIndexBuilds(op db.Oplog) (string, []*idx.Inde // applyOps is a wrapper for the applyOps database command, we pass in // a session to avoid opening a new connection for a few inserts at a time. func (o *OplogRestore) applyOps(entries []interface{}) error { - singleRes := o.dst.Session().Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}}) + singleRes := o.dst.Database("admin").RunCommand(context.TODO(), bson.D{{"applyOps", entries}}) if err := singleRes.Err(); err != nil { return errors.Wrap(err, "applyOps") } diff --git a/pbm/pbm.go b/pbm/pbm.go index 0c36d7293..4b269211a 100644 --- a/pbm/pbm.go +++ b/pbm/pbm.go @@ -69,7 +69,6 @@ const ( CmdCancelBackup Command = "cancelBackup" CmdResync Command = "resync" CmdPITR Command = "pitr" - CmdPITRestore Command = "pitrestore" CmdDeleteBackup Command = "delete" CmdDeletePITR Command = "deletePitr" CmdCleanup Command = "cleanup" @@ -89,8 +88,6 @@ func (c Command) String() string { return "Resync storage" case CmdPITR: return "PITR incremental backup" - case CmdPITRestore: - return "PITR restore" case CmdDeleteBackup: return "Delete" case CmdDeletePITR: @@ -109,7 +106,6 @@ type Cmd struct { Backup *BackupCmd `bson:"backup,omitempty"` Restore *RestoreCmd `bson:"restore,omitempty"` Replay *ReplayCmd `bson:"replay,omitempty"` - PITRestore *PITRestoreCmd `bson:"pitrestore,omitempty"` Delete *DeleteBackupCmd `bson:"delete,omitempty"` DeletePITR *DeletePITRCmd `bson:"deletePitr,omitempty"` Cleanup *CleanupCmd `bson:"cleanup,omitempty"` @@ -148,10 +144,6 @@ func (c Cmd) String() string { buf.WriteString(" [") buf.WriteString(c.Restore.String()) buf.WriteString("]") - case CmdPITRestore: - buf.WriteString(" [") - buf.WriteString(c.PITRestore.String()) - buf.WriteString("]") } buf.WriteString(" 0 { - bcp += fmt.Sprintf(" ts: %d,%d", r.ExtTS.T, r.ExtTS.I) + bcp += fmt.Sprintf(" external ts: <%d,%d>", r.ExtTS.T, r.ExtTS.I) + } + if r.OplogTS.T > 0 { + bcp += fmt.Sprintf(" point-in-time: <%d,%d>", r.OplogTS.T, r.OplogTS.I) } return fmt.Sprintf("name: %s, %s", r.Name, bcp) @@ -216,22 +213,6 @@ func (c ReplayCmd) String() string { return fmt.Sprintf("name: %s, time: %d - %d", c.Name, c.Start, c.End) } -type PITRestoreCmd struct { - Name string `bson:"name"` - TS int64 `bson:"ts"` - I int64 `bson:"i"` - Bcp string `bson:"bcp"` - Namespaces []string `bson:"nss,omitempty"` - RSMap map[string]string `bson:"rsMap,omitempty"` -} - -func (p PITRestoreCmd) String() string { - if p.Bcp != "" { - return fmt.Sprintf("name: %s, point-in-time ts: %d, base-snapshot: %s", p.Name, p.TS, p.Bcp) - } - return fmt.Sprintf("name: %s, point-in-time ts: %d", p.Name, p.TS) -} - type DeleteBackupCmd struct { Backup string `bson:"backup"` OlderThan int64 `bson:"olderthan"` diff --git a/pbm/restore.go b/pbm/restore.go index 79fc895ea..588d9f592 100644 --- a/pbm/restore.go +++ b/pbm/restore.go @@ -1,10 +1,13 @@ package pbm import ( + "bytes" "fmt" "sort" + "strconv" "time" + "github.com/mongodb/mongo-tools/common/db" "github.com/percona/percona-backup-mongodb/pbm/storage/s3" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -36,14 +39,40 @@ type RestoreMeta struct { } type RestoreStat struct { - Download map[string]map[string]s3.DownloadStat `bson:"download,omitempty" json:"download,omitempty"` + RS map[string]map[string]RestoreRSMetrics `bson:"rs,omitempty" json:"rs,omitempty"` +} +type RestoreRSMetrics struct { + DistTxn DistTxnStat `bson:"txn,omitempty" json:"txn,omitempty"` + Download s3.DownloadStat `bson:"download,omitempty" json:"download,omitempty"` +} + +type DistTxnStat struct { + // Partial is the num of transactions that were allied on other shards + // but can't be applied on this one since not all prepare messages got + // into the oplog (shouldn't happen). + Partial int `bson:"partial" json:"partial"` + // ShardUncommitted is the number of uncommitted transactions before + // the sync. Basically, the transaction is full but no commit message + // in the oplog of this shard. + ShardUncommitted int `bson:"shard_uncommitted" json:"shard_uncommitted"` + // LeftUncommitted is the num of transactions that remain uncommitted + // after the sync. The transaction is full but no commit message in the + // oplog of any shard. + LeftUncommitted int `bson:"left_uncommitted" json:"left_uncommitted"` +} + +type RestoreShardStat struct { + Txn DistTxnStat `json:"txn"` + D *s3.DownloadStat `json:"d"` } type RestoreReplset struct { Name string `bson:"name" json:"name"` StartTS int64 `bson:"start_ts" json:"start_ts"` Status Status `bson:"status" json:"status"` - Txn RestoreTxn `bson:"txn" json:"txn"` + CommittedTxn []RestoreTxn `bson:"committed_txn" json:"committed_txn"` + CommittedTxnSet bool `bson:"txn_set" json:"txn_set"` + PartialTxn []db.Oplog `bson:"partial_txn" json:"partial_txn"` CurrentOp primitive.Timestamp `bson:"op" json:"op"` LastTransitionTS int64 `bson:"last_transition_ts" json:"last_transition_ts"` LastWriteTS primitive.Timestamp `bson:"last_write_ts" json:"last_write_ts"` @@ -51,6 +80,7 @@ type RestoreReplset struct { Error string `bson:"error,omitempty" json:"error,omitempty"` Conditions Conditions `bson:"conditions" json:"conditions"` Hb primitive.Timestamp `bson:"hb" json:"hb"` + Stat RestoreShardStat `bson:"stat" json:"stat"` } type Conditions []*Condition @@ -91,15 +121,76 @@ type RestoreTxn struct { State TxnState `bson:"state" json:"state"` } +func (t RestoreTxn) Encode() []byte { + return []byte(fmt.Sprintf("txn:%d,%d:%s:%s", t.Ctime.T, t.Ctime.I, t.ID, t.State)) +} + +func (t *RestoreTxn) Decode(b []byte) error { + for k, v := range bytes.SplitN(bytes.TrimSpace(b), []byte{':'}, 4) { + switch k { + case 0: + case 1: + if si := bytes.SplitN(v, []byte{','}, 2); len(si) == 2 { + tt, err := strconv.ParseInt(string(si[0]), 10, 64) + if err != nil { + return errors.Wrap(err, "parse clusterTime T") + } + ti, err := strconv.ParseInt(string(si[1]), 10, 64) + if err != nil { + return errors.Wrap(err, "parse clusterTime I") + } + + t.Ctime = primitive.Timestamp{T: uint32(tt), I: uint32(ti)} + } + case 2: + t.ID = string(v) + case 3: + t.State = TxnState(string(v)) + } + } + + return nil +} + func (t RestoreTxn) String() string { return fmt.Sprintf("<%s> [%s] %v", t.ID, t.State, t.Ctime) } -func (p *PBM) RestoreSetRSTxn(name string, rsName string, txn RestoreTxn) error { +func (p *PBM) RestoreSetRSTxn(name string, rsName string, txn []RestoreTxn) error { + _, err := p.Conn.Database(DB).Collection(RestoresCollection).UpdateOne( + p.ctx, + bson.D{{"name", name}, {"replsets.name", rsName}}, + bson.D{{"$set", bson.M{"replsets.$.committed_txn": txn, "replsets.$.txn_set": true}}}, + ) + + return err +} + +func (p *PBM) RestoreSetRSStat(name string, rsName string, stat RestoreShardStat) error { + _, err := p.Conn.Database(DB).Collection(RestoresCollection).UpdateOne( + p.ctx, + bson.D{{"name", name}, {"replsets.name", rsName}}, + bson.D{{"$set", bson.M{"replsets.$.stat": stat}}}, + ) + + return err +} + +func (p *PBM) RestoreSetStat(name string, stat RestoreStat) error { + _, err := p.Conn.Database(DB).Collection(RestoresCollection).UpdateOne( + p.ctx, + bson.D{{"name", name}}, + bson.D{{"$set", bson.M{"stat": stat}}}, + ) + + return err +} + +func (p *PBM) RestoreSetRSPartTxn(name string, rsName string, txn []db.Oplog) error { _, err := p.Conn.Database(DB).Collection(RestoresCollection).UpdateOne( p.ctx, bson.D{{"name", name}, {"replsets.name", rsName}}, - bson.D{{"$set", bson.M{"replsets.$.txn": txn}}}, + bson.D{{"$set", bson.M{"replsets.$.partial_txn": txn}}}, ) return err diff --git a/pbm/restore/logical.go b/pbm/restore/logical.go index 6a020cb0d..a9f1a38bb 100644 --- a/pbm/restore/logical.go +++ b/pbm/restore/logical.go @@ -9,8 +9,8 @@ import ( "strings" "time" - "github.com/golang/snappy" "github.com/mongodb/mongo-tools/common/bsonutil" + "github.com/mongodb/mongo-tools/common/db" "github.com/mongodb/mongo-tools/common/idx" "github.com/mongodb/mongo-tools/mongorestore" "github.com/pkg/errors" @@ -50,9 +50,8 @@ type Restore struct { // empty if all shard names are the same sMap map[string]string - oplog *oplog.OplogRestore - log *log.Event - opid string + log *log.Event + opid string indexCatalog *idx.IndexCatalog } @@ -95,7 +94,7 @@ func (r *Restore) exit(err error, l *log.Event) { func (r *Restore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event) (err error) { defer func() { r.exit(err, l) }() // !!! has to be in a closure - bcp, err := r.SnapshotMeta(cmd.BackupName) + bcp, err := SnapshotMeta(r.cn, cmd.BackupName, r.stg) if err != nil { return err } @@ -202,7 +201,7 @@ func newConfigsvrOpFilter(nss []string) oplog.OpFilter { } // PITR do the Point-in-Time Recovery -func (r *Restore) PITR(cmd *pbm.PITRestoreCmd, opid pbm.OPID, l *log.Event) (err error) { +func (r *Restore) PITR(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event) (err error) { defer func() { r.exit(err, l) }() // !!! has to be in a closure err = r.init(cmd.Name, opid, l) @@ -210,24 +209,10 @@ func (r *Restore) PITR(cmd *pbm.PITRestoreCmd, opid pbm.OPID, l *log.Event) (err return err } - tsTo := primitive.Timestamp{T: uint32(cmd.TS), I: uint32(cmd.I)} - var bcp *pbm.BackupMeta - if cmd.Bcp == "" { - bcp, err = r.cn.GetLastBackup(&tsTo) - if errors.Is(err, pbm.ErrNotFound) { - return errors.Errorf("no backup found before ts %v", tsTo) - } - if err != nil { - return errors.Wrap(err, "define last backup") - } - } else { - bcp, err = r.SnapshotMeta(cmd.Bcp) - if err != nil { - return err - } - if primitive.CompareTimestamp(bcp.LastWriteTS, tsTo) >= 0 { - return errors.New("snapshot's last write is later than the target time. Try to set an earlier snapshot. Or leave the snapshot empty so PBM will choose one.") - } + // tsTo := primitive.Timestamp{T: uint32(cmd.TS), I: uint32(cmd.I)} + bcp, err := GetBaseBackup(r.cn, cmd.BackupName, cmd.OplogTS, r.stg) + if err != nil { + return errors.Wrap(err, "get base backup") } nss := cmd.Namespaces @@ -236,7 +221,7 @@ func (r *Restore) PITR(cmd *pbm.PITRestoreCmd, opid pbm.OPID, l *log.Event) (err } if r.nodeInfo.IsLeader() { - err = r.cn.SetOplogTimestamps(r.name, 0, int64(tsTo.T)) + err = r.cn.SetOplogTimestamps(r.name, 0, int64(cmd.OplogTS.T)) if err != nil { return errors.Wrap(err, "set PITR timestamp") } @@ -270,7 +255,7 @@ func (r *Restore) PITR(cmd *pbm.PITRestoreCmd, opid pbm.OPID, l *log.Event) (err r.sMap = r.getShardMapping(bcp) } - chunks, err := r.chunks(bcp.LastWriteTS, tsTo) + chunks, err := r.chunks(bcp.LastWriteTS, cmd.OplogTS) if err != nil { return err } @@ -303,7 +288,7 @@ func (r *Restore) PITR(cmd *pbm.PITRestoreCmd, opid pbm.OPID, l *log.Event) (err EndTS: bcp.LastWriteTS, } - oplogOption := applyOplogOption{end: &tsTo, nss: nss} + oplogOption := applyOplogOption{end: &cmd.OplogTS, nss: nss} if r.nodeInfo.IsConfigSrv() && sel.IsSelective(nss) { oplogOption.nss = []string{"config.databases"} oplogOption.filter = newConfigsvrOpFilter(nss) @@ -363,7 +348,7 @@ func (r *Restore) ReplayOplog(cmd *pbm.ReplayCmd, opid pbm.OPID, l *log.Event) ( return r.Done() // skip. no oplog for current rs } - chunks, err := r.chunks(cmd.Start, cmd.End) + opChunks, err := r.chunks(cmd.Start, cmd.End) if err != nil { return err } @@ -378,7 +363,7 @@ func (r *Restore) ReplayOplog(cmd *pbm.ReplayCmd, opid pbm.OPID, l *log.Event) ( end: &cmd.End, unsafe: true, } - if err = r.applyOplog(chunks, &oplogOption); err != nil { + if err = r.applyOplog(opChunks, &oplogOption); err != nil { return err } @@ -492,46 +477,19 @@ func (r *Restore) checkTopologyForOplog(ctx context.Context, currShards []pbm.Sh // is contiguous - there are no gaps), checks for respective files on storage and returns // chunks list if all checks passed func (r *Restore) chunks(from, to primitive.Timestamp) ([]pbm.OplogChunk, error) { - mapRevRS := pbm.MakeReverseRSMapFunc(r.rsMap) - chunks, err := r.cn.PITRGetChunksSlice(mapRevRS(r.nodeInfo.SetName), from, to) - if err != nil { - return nil, errors.Wrap(err, "get chunks index") - } - - if len(chunks) == 0 { - return nil, errors.New("no chunks found") - } - - if primitive.CompareTimestamp(chunks[len(chunks)-1].EndTS, to) == -1 { - return nil, errors.Errorf("no chunk with the target time, the last chunk ends on %v", chunks[len(chunks)-1].EndTS) - } - - last := from - for _, c := range chunks { - if primitive.CompareTimestamp(last, c.StartTS) == -1 { - return nil, errors.Errorf("integrity vilolated, expect chunk with start_ts %v, but got %v", last, c.StartTS) - } - last = c.EndTS - - _, err := r.stg.FileStat(c.FName) - if err != nil { - return nil, errors.Errorf("failed to ensure chunk %v.%v on the storage, file: %s, error: %v", c.StartTS, c.EndTS, c.FName, err) - } - } - - return chunks, nil + return chunks(r.cn, r.stg, from, to, r.nodeInfo.SetName, r.rsMap) } -func (r *Restore) SnapshotMeta(backupName string) (bcp *pbm.BackupMeta, err error) { - bcp, err = r.cn.GetBackupMeta(backupName) +func SnapshotMeta(cn *pbm.PBM, backupName string, stg storage.Storage) (bcp *pbm.BackupMeta, err error) { + bcp, err = cn.GetBackupMeta(backupName) if errors.Is(err, pbm.ErrNotFound) { - bcp, err = GetMetaFromStore(r.stg, backupName) + bcp, err = GetMetaFromStore(stg, backupName) } if err != nil { return nil, errors.Wrap(err, "get backup metadata") } - return bcp, err + return bcp, nil } // setShards defines and set shards participating in the restore @@ -955,277 +913,104 @@ func updateChunksRouterTable(ctx context.Context, m *mongo.Client, sMap map[stri return errors.WithMessage(err, "bulk write") } -func (r *Restore) distTxnChecker(done <-chan struct{}, ctxn chan pbm.RestoreTxn, txnSyncErr chan<- error) { - defer r.log.Debug("exit distTxnChecker") - - for { - select { - case txn := <-ctxn: - r.log.Debug("found dist txn: %s", txn) - err := r.cn.RestoreSetRSTxn(r.name, r.nodeInfo.SetName, txn) - if err != nil { - txnSyncErr <- errors.Wrapf(err, "set transaction %v", txn) - return - } - - if txn.State == pbm.TxnCommit { - txn.State, err = r.txnWaitForShards(txn) - if err != nil { - txnSyncErr <- errors.Wrapf(err, "wait transaction %v", txn) - return - } - r.log.Debug("txn converged to [%s] <%s>", txn.State, txn.ID) - ctxn <- txn - } - case <-done: - return - } - } -} - -func (r *Restore) waitingTxnChecker(e *error, done <-chan struct{}) { - defer r.log.Debug("exit waitingTxnChecker") - - tk := time.NewTicker(time.Second * 1) - defer tk.Stop() - - observedTxn := make(map[string]struct{}) - for { - select { - case <-tk.C: - err := r.checkWaitingTxns(observedTxn) - if err != nil { - *e = err - return - } - case <-done: - return - } - } -} - -type applyOplogOption struct { - start *primitive.Timestamp - end *primitive.Timestamp - nss []string - unsafe bool - filter oplog.OpFilter +func (r *Restore) setcommittedTxn(txn []pbm.RestoreTxn) error { + return r.cn.RestoreSetRSTxn(r.name, r.nodeInfo.SetName, txn) } -// In order to sync distributed transactions (commit ontly when all participated shards are committed), -// on all participated in the retore agents: -// distTxnChecker: -// - Receiving distributed transactions from the oplog applier, will add set it to the shards restore meta. -// - If txn's state is `commit` it will wait from all of the rest of the shards for: -// - either transaction committed on the shard (have `commitTransaction`); -// - or shard didn't participate in the transaction (more on that below); -// - or shard participated in the txn (have prepare ops) but no `commitTransaction` by the end of the oplog. -// If any of the shards encounters the latter - the transaction is sent back to the applier as aborted. -// Otherwise - committed. -// -// By looking at just transactions in the oplog we can't tell which shards were participating in it. So given that -// starting `opTime` of the transaction is the same on all shards, we can assume that if some shard(s) oplog applier -// observed greater than the txn's `opTime` and hadn't seen such txn - it wasn't part of this transaction at all. To -// communicate that, each agent runs `checkWaitingTxns` which in turn periodically checks restore metadata and sees -// any new (unobserved before) waiting for the transaction, posts last observed opTime. We go with `checkWaitingTxns` -// instead of just updating each observed `opTime` since the latter would add an extra 1 write to each oplog op on -// sharded clusters even if there are no dist txns at all. -func (r *Restore) applyOplog(chunks []pbm.OplogChunk, options *applyOplogOption) error { - r.log.Info("starting oplog replay") - var err error - - mgoV, err := r.node.GetMongoVersion() - if err != nil || len(mgoV.Version) < 1 { - return errors.Wrap(err, "define mongo version") - } +func (r *Restore) getcommittedTxn() (map[string]primitive.Timestamp, error) { + txn := make(map[string]primitive.Timestamp) - var ( - ctxn chan pbm.RestoreTxn - txnSyncErr chan error - ) - if r.nodeInfo.IsSharded() { - ctxn = make(chan pbm.RestoreTxn) - txnSyncErr = make(chan error) + shards := make(map[string]struct{}) + for _, s := range r.shards { + shards[s.RS] = struct{}{} } - r.oplog, err = oplog.NewOplogRestore(r.node, r.indexCatalog, mgoV, options.unsafe, true, ctxn, txnSyncErr) - if err != nil { - return errors.Wrap(err, "create oplog") - } - - r.oplog.SetOpFilter(options.filter) - - var startTS, endTS primitive.Timestamp - if options.start != nil { - startTS = *options.start - } - if options.end != nil { - endTS = *options.end - } - r.oplog.SetTimeframe(startTS, endTS) - r.oplog.SetIncludeNS(options.nss) - - var waitTxnErr error - if r.nodeInfo.IsSharded() { - r.log.Debug("starting sharded txn sync") - if len(r.shards) == 0 { - return errors.New("participating shards undefined") - } - done := make(chan struct{}) - go r.distTxnChecker(done, ctxn, txnSyncErr) - go r.waitingTxnChecker(&waitTxnErr, done) - defer close(done) - } - - var lts primitive.Timestamp - for _, chnk := range chunks { - r.log.Debug("+ applying %v", chnk) - - // If the compression is Snappy and it failed we try S2. - // Up until v1.7.0 the compression of pitr chunks was always S2. - // But it was a mess in the code which lead to saving pitr chunk files - // with the `.snappy`` extension although it was S2 in fact. And during - // the restore, decompression treated .snappy as S2 ¯\_(ツ)_/¯ It wasn’t - // an issue since there was no choice. Now, Snappy produces `.snappy` files - // and S2 - `.s2` which is ok. But this means the old chunks (made by previous - // PBM versions) won’t be compatible - during the restore, PBM will treat such - // files as Snappy (judging by its suffix) but in fact, they are s2 files - // and restore will fail with snappy: corrupt input. So we try S2 in such a case. - lts, err = r.replayChunk(chnk.FName, chnk.Compression) - if err != nil && errors.Is(err, snappy.ErrCorrupt) { - lts, err = r.replayChunk(chnk.FName, compress.CompressionTypeS2) - } + for len(shards) > 0 { + bmeta, err := r.cn.GetRestoreMeta(r.name) if err != nil { - return errors.Wrapf(err, "replay chunk %v.%v", chnk.StartTS.T, chnk.EndTS.T) + return nil, errors.Wrap(err, "get restore metadata") } - if waitTxnErr != nil { - return errors.Wrap(err, "check waiting transactions") + clusterTime, err := r.cn.ClusterTime() + if err != nil { + return nil, errors.Wrap(err, "read cluster time") } - } - - r.log.Info("oplog replay finished on %v", lts) - - return nil -} - -func (r *Restore) checkWaitingTxns(observedTxn map[string]struct{}) error { - rmeta, err := r.cn.GetRestoreMeta(r.name) - if err != nil { - return errors.Wrap(err, "get restore metadata") - } - for _, shard := range rmeta.Replsets { - if _, ok := observedTxn[shard.Txn.ID]; shard.Txn.State == pbm.TxnCommit && !ok { - ts := primitive.Timestamp{r.oplog.LastOpTS(), 0} - if primitive.CompareTimestamp(ts, shard.Txn.Ctime) > 0 { - err := r.cn.SetCurrentOp(r.name, r.nodeInfo.SetName, ts) + // not going directly thru bmeta.Replsets to be sure we've heard back + // from all participated in the restore shards. + for _, shard := range bmeta.Replsets { + if _, ok := shards[shard.Name]; !ok { + continue + } + // check if node alive + lock, err := r.cn.GetLockData(&pbm.LockHeader{ + Type: pbm.CmdRestore, + OPID: r.opid, + Replset: shard.Name, + }) + + // nodes are cleaning its locks moving to the done status + // so no lock is ok, and no need to check the heartbeats + if err != mongo.ErrNoDocuments { if err != nil { - return errors.Wrap(err, "set current op timestamp") + return nil, errors.Wrapf(err, "unable to read lock for shard %s", shard.Name) + } + if lock.Heartbeat.T+pbm.StaleFrameSec < clusterTime.T { + return nil, errors.Errorf("lost shard %s, last beat ts: %d", shard.Name, lock.Heartbeat.T) } } - observedTxn[shard.Txn.ID] = struct{}{} - return nil - } - } - - return nil -} - -func (r *Restore) txnWaitForShards(txn pbm.RestoreTxn) (pbm.TxnState, error) { - tk := time.NewTicker(time.Second * 1) - defer tk.Stop() - for { - select { - case <-tk.C: - s, err := r.checkTxn(txn) - if err != nil { - return pbm.TxnUnknown, err + if shard.Status == pbm.StatusError { + return nil, errors.Errorf("shard %s failed with: %v", shard.Name, shard.Error) } - if s == pbm.TxnCommit || s == pbm.TxnAbort { - return s, nil + + if shard.CommittedTxnSet { + for _, t := range shard.CommittedTxn { + if t.State == pbm.TxnCommit { + txn[t.ID] = t.Ctime + } + } + delete(shards, shard.Name) } - case <-r.cn.Context().Done(): - return pbm.TxnAbort, nil } + time.Sleep(time.Second * 1) } + + return txn, nil } -func (r *Restore) checkTxn(txn pbm.RestoreTxn) (pbm.TxnState, error) { - bmeta, err := r.cn.GetRestoreMeta(r.name) - if err != nil { - return pbm.TxnUnknown, errors.Wrap(err, "get restore metadata") +func (r *Restore) applyOplog(chunks []pbm.OplogChunk, options *applyOplogOption) error { + mgoV, err := r.node.GetMongoVersion() + if err != nil || len(mgoV.Version) < 1 { + return errors.Wrap(err, "define mongo version") } + stat := pbm.RestoreShardStat{} + partial, err := applyOplog(r.node.Session(), chunks, options, r.nodeInfo.IsSharded(), + r.indexCatalog, r.setcommittedTxn, r.getcommittedTxn, &stat.Txn, + mgoV, r.stg, r.log) - clusterTime, err := r.cn.ClusterTime() if err != nil { - return pbm.TxnUnknown, errors.Wrap(err, "read cluster time") + return errors.Wrap(err, "reply oplog") } - // not going directly thru bmeta.Replsets to be sure we've heard back - // from all participated in the restore shards. - shardsToFinish := len(r.shards) - for _, sh := range r.shards { - for _, shard := range bmeta.Replsets { - if shard.Name == sh.RS { - // check if node alive - lock, err := r.cn.GetLockData(&pbm.LockHeader{ - Type: pbm.CmdRestore, - OPID: r.opid, - Replset: shard.Name, - }) - - // nodes are cleaning its locks moving to the done status - // so no lock is ok and not need to ckech the heartbeats - if err != mongo.ErrNoDocuments { - if err != nil { - return pbm.TxnUnknown, errors.Wrapf(err, "unable to read lock for shard %s", shard.Name) - } - if lock.Heartbeat.T+pbm.StaleFrameSec < clusterTime.T { - return pbm.TxnUnknown, errors.Errorf("lost shard %s, last beat ts: %d", shard.Name, lock.Heartbeat.T) - } - } - - if shard.Status == pbm.StatusError { - return pbm.TxnUnknown, errors.Errorf("shard %s failed with: %v", shard.Name, shard.Error) - } - - if primitive.CompareTimestamp(shard.CurrentOp, txn.Ctime) == 1 { - shardsToFinish-- - continue - } - - if shard.Txn.ID != txn.ID { - if shard.Status == pbm.StatusDone || - primitive.CompareTimestamp(shard.Txn.Ctime, txn.Ctime) == 1 { - shardsToFinish-- - continue - } - return pbm.TxnUnknown, nil - } + if len(partial) > 0 { + tops := []db.Oplog{} + for _, t := range partial { + tops = append(tops, t.Oplog...) + } - // check status - switch shard.Txn.State { - case pbm.TxnPrepare: - if shard.Status == pbm.StatusDone { - return pbm.TxnAbort, nil - } - return pbm.TxnUnknown, nil - case pbm.TxnAbort: - return pbm.TxnAbort, nil - case pbm.TxnCommit: - shardsToFinish-- - } - } + err = r.cn.RestoreSetRSPartTxn(r.name, r.nodeInfo.SetName, tops) + if err != nil { + return errors.Wrap(err, "set partial transactions") } } - if shardsToFinish == 0 { - return pbm.TxnCommit, nil + err = r.cn.RestoreSetRSStat(r.name, r.nodeInfo.SetName, stat) + if err != nil { + r.log.Warning("applyOplog: failed to set stat: %v", err) } - return pbm.TxnUnknown, nil + return nil } func (r *Restore) snapshot(input io.Reader) (err error) { @@ -1243,24 +1028,6 @@ func (r *Restore) snapshot(input io.Reader) (err error) { return err } -func (r *Restore) replayChunk(file string, c compress.CompressionType) (lts primitive.Timestamp, err error) { - or, err := r.stg.SourceReader(file) - if err != nil { - return lts, errors.Wrapf(err, "get object %s form the storage", file) - } - defer or.Close() - - oplogReader, err := compress.Decompress(or, c) - if err != nil { - return lts, errors.Wrapf(err, "decompress object %s", file) - } - defer oplogReader.Close() - - lts, err = r.oplog.Apply(oplogReader) - - return lts, errors.Wrap(err, "apply oplog for chunk") -} - // Done waits for the replicas to finish the job // and marks restore as done func (r *Restore) Done() error { @@ -1274,6 +1041,32 @@ func (r *Restore) Done() error { if err != nil { return errors.Wrap(err, "check cluster for the restore done") } + + m, err := r.cn.GetRestoreMeta(r.name) + if err != nil { + return errors.Wrap(err, "update stat: get restore meta") + } + if m == nil { + return nil + } + + stat := make(map[string]map[string]pbm.RestoreRSMetrics) + + for _, rs := range m.Replsets { + stat[rs.Name] = map[string]pbm.RestoreRSMetrics{ + "_primary": {DistTxn: pbm.DistTxnStat{ + Partial: rs.Stat.Txn.Partial, + ShardUncommitted: rs.Stat.Txn.ShardUncommitted, + LeftUncommitted: rs.Stat.Txn.LeftUncommitted, + }}, + } + + } + + err = r.cn.RestoreSetStat(r.name, pbm.RestoreStat{RS: stat}) + if err != nil { + return errors.Wrap(err, "set restore stat") + } } return nil diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index edd0b5d04..3336fca95 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/mongodb/mongo-tools/common/db" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -661,7 +662,7 @@ func (l *logBuff) Flush() error { // - CLI provided values // - replset metada in the datadir // - backup meta -func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, stopAgentC chan<- struct{}, pauseHB func()) (err error) { +func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, pitr primitive.Timestamp, opid pbm.OPID, l *log.Event, stopAgentC chan<- struct{}, pauseHB func()) (err error) { l.Debug("port: %d", r.tmpPort) meta := &pbm.RestoreMeta{ @@ -713,6 +714,14 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, meta.Type = r.bcp.Type } + var opChunks []pbm.OplogChunk + if !pitr.IsZero() { + opChunks, err = chunks(r.cn, r.stg, r.restoreTS, pitr, r.rsConf.ID, r.rsMap) + if err != nil { + return err + } + } + if meta.Type == pbm.IncrementalBackup { meta.BcpChain = make([]string, 0, len(r.files)) for i := len(r.files) - 1; i >= 0; i-- { @@ -771,6 +780,7 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, progress |= restoreStared var excfg *pbm.MongodOpts + var stats pbm.RestoreShardStat if cmd.External { _, err = r.toState(pbm.StatusCopyReady) @@ -811,14 +821,10 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, } } else { l.Info("copying backup data") - dstat, err := r.copyFiles() + stats.D, err = r.copyFiles() if err != nil { return errors.Wrap(err, "copy files") } - err = r.writeStat(dstat) - if err != nil { - r.log.Warning("write download stat: %v", err) - } } if o, ok := cmd.ExtConf[r.nodeInfo.SetName]; ok { @@ -849,6 +855,14 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, return errors.Wrap(err, "recover oplog as standalone") } + if !pitr.IsZero() && r.nodeInfo.IsPrimary { + l.Info("replaying pitr oplog") + err = r.replayOplog(r.bcp.LastWriteTS, pitr, opChunks, &stats) + if err != nil { + return errors.Wrap(err, "replay pitr oplog") + } + } + l.Info("clean-up and reset replicaset config") err = r.resetRS() if err != nil { @@ -866,6 +880,11 @@ func (r *PhysRestore) Snapshot(cmd *pbm.RestoreCmd, opid pbm.OPID, l *log.Event, return errors.Wrapf(err, "moving to state %s", pbm.StatusDone) } + err = r.writeStat(stats) + if err != nil { + r.log.Warning("write download stat: %v", err) + } + r.log.Info("writing restore meta") err = r.dumpMeta(meta, stat, "") if err != nil { @@ -930,12 +949,7 @@ func (r *PhysRestore) cleanupDatadir(bcpFiles []pbm.File) error { } func (r *PhysRestore) writeStat(stat any) error { - d := struct { - D any `json:"d"` - }{ - D: stat, - } - b, err := json.Marshal(d) + b, err := json.Marshal(stat) if err != nil { return errors.Wrap(err, "marshal") } @@ -1203,6 +1217,100 @@ func (r *PhysRestore) recoverStandalone() error { return nil } +func (r *PhysRestore) replayOplog(from, to primitive.Timestamp, opChunks []pbm.OplogChunk, stat *pbm.RestoreShardStat) error { + err := r.startMongo("--dbpath", r.dbpath, + "--setParameter", "disableLogicalSessionCacheRefresh=true") + if err != nil { + return errors.Wrap(err, "start mongo") + } + + c, err := tryConn(5, time.Minute*5, r.tmpPort, path.Join(r.dbpath, internalMongodLog)) + if err != nil { + return errors.Wrap(err, "connect to mongo") + } + + ctx := context.Background() + _, err = c.Database("local").Collection("system.replset").InsertOne(ctx, + pbm.RSConfig{ + ID: r.rsConf.ID, + CSRS: r.nodeInfo.IsConfigSrv(), + Version: 1, + Members: []pbm.RSMember{{ + ID: 0, + Host: "localhost:" + strconv.Itoa(r.tmpPort), + Votes: 1, + Priority: 1, + BuildIndexes: true, + }}, + }, + ) + if err != nil { + return errors.Wrapf(err, "upate rs.member host to %s", r.nodeInfo.Me) + } + + err = shutdown(c, r.dbpath) + if err != nil { + return errors.Wrap(err, "shutdown mongo") + } + + flags := []string{"--dbpath", r.dbpath, + "--setParameter", "disableLogicalSessionCacheRefresh=true", + "--setParameter", "takeUnstableCheckpointOnShutdown=true", + "--replSet", r.rsConf.ID} + if r.nodeInfo.IsConfigSrv() { + flags = append(flags, "--configsvr") + } + err = r.startMongo(flags...) + if err != nil { + return errors.Wrap(err, "start mongo as rs") + } + + c, err = tryConn(5, time.Minute*5, r.tmpPort, path.Join(r.dbpath, internalMongodLog)) + if err != nil { + return errors.Wrap(err, "connect to mongo rs") + } + + mgoV, err := pbm.GetMongoVersion(ctx, c) + if err != nil || len(mgoV.Version) < 1 { + return errors.Wrap(err, "define mongo version") + } + + oplogOption := applyOplogOption{ + start: &from, + end: &to, + unsafe: true, + } + partial, err := applyOplog(c, opChunks, &oplogOption, r.nodeInfo.IsSharded(), + nil, r.setcommittedTxn, r.getcommittedTxn, &stat.Txn, + &mgoV, r.stg, r.log) + if err != nil { + return errors.Wrap(err, "reply oplog") + } + if len(partial) > 0 { + tops := []db.Oplog{} + for _, t := range partial { + tops = append(tops, t.Oplog...) + } + + var b bytes.Buffer + err := json.NewEncoder(&b).Encode(tops) + if err != nil { + return errors.Wrap(err, "encode") + } + err = r.stg.Save(r.syncPathRS+".partTxn", &b, int64(b.Len())) + if err != nil { + return errors.Wrap(err, "write partial transactions") + } + } + + err = shutdown(c, r.dbpath) + if err != nil { + return errors.Wrap(err, "shutdown mongo") + } + + return nil +} + func (r *PhysRestore) resetRS() error { err := r.startMongo("--dbpath", r.dbpath, "--setParameter", "disableLogicalSessionCacheRefresh=true", @@ -1436,6 +1544,59 @@ func (r *PhysRestore) agreeCommonRestoreTS() (ts primitive.Timestamp, err error) return ts, nil } +func (r *PhysRestore) setcommittedTxn(txn []pbm.RestoreTxn) error { + if txn == nil { + txn = []pbm.RestoreTxn{} + } + var b bytes.Buffer + err := json.NewEncoder(&b).Encode(txn) + if err != nil { + return errors.Wrap(err, "encode") + } + return r.stg.Save(r.syncPathRS+".txn", + &b, int64(b.Len()), + ) +} + +func (r *PhysRestore) getcommittedTxn() (map[string]primitive.Timestamp, error) { + shards := copyMap(r.syncPathShards) + txn := make(map[string]primitive.Timestamp) + for len(shards) > 0 { + for f := range shards { + dr, err := r.stg.FileStat(f + "." + string(pbm.StatusDone)) + if err != nil && !errors.Is(err, storage.ErrNotExist) { + return nil, errors.Wrapf(err, "check done for <%s>", f) + } + if err == nil && dr.Size != 0 { + delete(shards, f) + continue + } + + txnr, err := r.stg.SourceReader(f + ".txn") + if err != nil && errors.Is(err, storage.ErrNotExist) { + continue + } + if err != nil { + return nil, errors.Wrapf(err, "get txns <%s>", f) + } + txns := []pbm.RestoreTxn{} + err = json.NewDecoder(txnr).Decode(&txns) + if err != nil { + return nil, errors.Wrapf(err, "deconde txns <%s>", f) + } + for _, t := range txns { + if t.State == pbm.TxnCommit { + txn[t.ID] = t.Ctime + } + } + delete(shards, f) + } + time.Sleep(time.Second * 5) + } + + return txn, nil +} + // Tries to connect to mongo n times, timeout is applied for each try. // If a try is unsuccessful, it will check the mongo logs and retry if // there are no errors or fatals. @@ -1586,24 +1747,22 @@ func (r *PhysRestore) init(name string, opid pbm.OPID, l *log.Event) (err error) } } - r.syncPathShards = make(map[string]struct{}) dsh, err := r.cn.ClusterMembers() if err != nil { return errors.Wrap(err, "get shards") } + r.syncPathShards = make(map[string]struct{}) for _, s := range dsh { - p := fmt.Sprintf("%s/%s/rs.%s/rs", pbm.PhysRestoresDir, r.name, s.RS) - r.syncPathShards[p] = struct{}{} + r.syncPathShards[fmt.Sprintf("%s/%s/rs.%s/rs", pbm.PhysRestoresDir, r.name, s.RS)] = struct{}{} } - r.syncPathDataShards = make(map[string]struct{}) sh, err := r.cn.GetShards() if err != nil { return errors.Wrap(err, "get data shards") } + r.syncPathDataShards = make(map[string]struct{}) for _, s := range sh { - p := fmt.Sprintf("%s/%s/rs.%s/rs", pbm.PhysRestoresDir, r.name, s.RS) - r.syncPathDataShards[p] = struct{}{} + r.syncPathDataShards[fmt.Sprintf("%s/%s/rs.%s/rs", pbm.PhysRestoresDir, r.name, s.RS)] = struct{}{} } err = r.hb() diff --git a/pbm/restore/restore.go b/pbm/restore/restore.go index f11c191b6..31806464a 100644 --- a/pbm/restore/restore.go +++ b/pbm/restore/restore.go @@ -4,13 +4,18 @@ import ( "encoding/json" "time" + "github.com/golang/snappy" + "github.com/mongodb/mongo-tools/common/idx" mlog "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/options" "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "github.com/percona/percona-backup-mongodb/pbm" + "github.com/percona/percona-backup-mongodb/pbm/compress" "github.com/percona/percona-backup-mongodb/pbm/log" + "github.com/percona/percona-backup-mongodb/pbm/oplog" "github.com/percona/percona-backup-mongodb/pbm/storage" ) @@ -204,41 +209,191 @@ func waitForStatus(cn *pbm.PBM, name string, status pbm.Status) error { } } -func waitForStatusT(cn *pbm.PBM, name string, status pbm.Status, t time.Duration) error { - tk := time.NewTicker(time.Second * 1) - defer tk.Stop() - tout := time.NewTicker(t) - defer tout.Stop() - for { - select { - case <-tk.C: - meta, err := cn.GetRestoreMeta(name) - if errors.Is(err, pbm.ErrNotFound) { - continue - } +func GetBaseBackup(cn *pbm.PBM, bcpName string, tsTo primitive.Timestamp, stg storage.Storage) (bcp *pbm.BackupMeta, err error) { + if bcpName == "" { + bcp, err = cn.GetLastBackup(&tsTo) + if errors.Is(err, pbm.ErrNotFound) { + return nil, errors.Errorf("no backup found before ts %v", tsTo) + } + if err != nil { + return nil, errors.Wrap(err, "define last backup") + } + return bcp, nil + } + + bcp, err = SnapshotMeta(cn, bcpName, stg) + if err != nil { + return nil, err + } + if primitive.CompareTimestamp(bcp.LastWriteTS, tsTo) >= 0 { + return nil, errors.New("snapshot's last write is later than the target time. Try to set an earlier snapshot. Or leave the snapshot empty so PBM will choose one.") + } + + return bcp, nil +} + +// chunks defines chunks of oplog slice in given range, ensures its integrity (timeline +// is contiguous - there are no gaps), checks for respective files on storage and returns +// chunks list if all checks passed +func chunks(cn *pbm.PBM, stg storage.Storage, from, to primitive.Timestamp, rsName string, rsMap map[string]string) ([]pbm.OplogChunk, error) { + mapRevRS := pbm.MakeReverseRSMapFunc(rsMap) + chunks, err := cn.PITRGetChunksSlice(mapRevRS(rsName), from, to) + if err != nil { + return nil, errors.Wrap(err, "get chunks index") + } + + if len(chunks) == 0 { + return nil, errors.New("no chunks found") + } + + if primitive.CompareTimestamp(chunks[len(chunks)-1].EndTS, to) == -1 { + return nil, errors.Errorf("no chunk with the target time, the last chunk ends on %v", chunks[len(chunks)-1].EndTS) + } + + last := from + for _, c := range chunks { + if primitive.CompareTimestamp(last, c.StartTS) == -1 { + return nil, errors.Errorf("integrity vilolated, expect chunk with start_ts %v, but got %v", last, c.StartTS) + } + last = c.EndTS + + _, err := stg.FileStat(c.FName) + if err != nil { + return nil, errors.Errorf("failed to ensure chunk %v.%v on the storage, file: %s, error: %v", c.StartTS, c.EndTS, c.FName, err) + } + } + + return chunks, nil +} + +type applyOplogOption struct { + start *primitive.Timestamp + end *primitive.Timestamp + nss []string + unsafe bool + filter oplog.OpFilter +} + +type setcommittedTxnFn func(txn []pbm.RestoreTxn) error +type getcommittedTxnFn func() (map[string]primitive.Timestamp, error) + +// By looking at just transactions in the oplog we can't tell which shards +// were participating in it. But we can assume that if there is +// commitTransaction at least on one shard then the transaction is committed +// everywhere. Otherwise, transactions won't be in the oplog or everywhere +// would be transactionAbort. So we just treat distributed as +// non-distributed - apply opps once a commit message for this txn is +// encountered. +// It might happen that by the end of the oplog there are some distributed txns +// without commit messages. We should commit such transactions only if the data is +// full (all prepared statements observed) and this txn was committed at least by +// one other shard. For that, each shard saves the last 100 dist transactions +// that were committed, so other shards can check if they should commit their +// leftovers. We store the last 100, as prepared statements and commits might be +// separated by other oplog events so it might happen that several commit messages +// can be cut away on some shards but present on other(s). Given oplog events of +// dist txns are more or less aligned in [cluster]time, checking the last 100 +// should be more than enough. +// If the transaction is more than 16Mb it will be split into several prepared +// messages. So it might happen that one shard committed the txn but another has +// observed not all prepared messages by the end of the oplog. In such a case we +// should report it in logs and describe-restore. +func applyOplog(node *mongo.Client, chunks []pbm.OplogChunk, options *applyOplogOption, sharded bool, + ic *idx.IndexCatalog, setTxn setcommittedTxnFn, getTxn getcommittedTxnFn, stat *pbm.DistTxnStat, + mgoV *pbm.MongoVersion, stg storage.Storage, log *log.Event) (partial []oplog.Txn, err error) { + log.Info("starting oplog replay") + + var ( + ctxn chan pbm.RestoreTxn + txnSyncErr chan error + ) + + oplogRestore, err := oplog.NewOplogRestore(node, ic, mgoV, options.unsafe, true, ctxn, txnSyncErr) + if err != nil { + return nil, errors.Wrap(err, "create oplog") + } + + oplogRestore.SetOpFilter(options.filter) + + var startTS, endTS primitive.Timestamp + if options.start != nil { + startTS = *options.start + } + if options.end != nil { + endTS = *options.end + } + oplogRestore.SetTimeframe(startTS, endTS) + oplogRestore.SetIncludeNS(options.nss) + + var lts primitive.Timestamp + for _, chnk := range chunks { + log.Debug("+ applying %v", chnk) + + // If the compression is Snappy and it failed we try S2. + // Up until v1.7.0 the compression of pitr chunks was always S2. + // But it was a mess in the code which lead to saving pitr chunk files + // with the `.snappy`` extension although it was S2 in fact. And during + // the restore, decompression treated .snappy as S2 ¯\_(ツ)_/¯ It wasn’t + // an issue since there was no choice. Now, Snappy produces `.snappy` files + // and S2 - `.s2` which is ok. But this means the old chunks (made by previous + // PBM versions) won’t be compatible - during the restore, PBM will treat such + // files as Snappy (judging by its suffix) but in fact, they are s2 files + // and restore will fail with snappy: corrupt input. So we try S2 in such a case. + lts, err = replayChunk(chnk.FName, oplogRestore, stg, chnk.Compression) + if err != nil && errors.Is(err, snappy.ErrCorrupt) { + lts, err = replayChunk(chnk.FName, oplogRestore, stg, compress.CompressionTypeS2) + } + if err != nil { + return nil, errors.Wrapf(err, "replay chunk %v.%v", chnk.StartTS.T, chnk.EndTS.T) + } + } + + // dealing with dist txns + if sharded { + uc, c := oplogRestore.TxnLeftovers() + stat.ShardUncommitted = len(uc) + go func() { + err := setTxn(c) if err != nil { - return errors.Wrap(err, "get restore metadata") + log.Error("write last committed txns %v", err) } - - clusterTime, err := cn.ClusterTime() + }() + if len(uc) > 0 { + commits, err := getTxn() if err != nil { - return errors.Wrap(err, "read cluster time") + return nil, errors.Wrap(err, "get committed txns on other shards") } - - if meta.Hb.T+pbm.StaleFrameSec < clusterTime.T { - return errors.Errorf("restore stuck, last beat ts: %d", meta.Hb.T) + var uncomm []oplog.Txn + partial, uncomm, err = oplogRestore.HandleUncommittedTxn(commits) + if err != nil { + return nil, errors.Wrap(err, "handle ucommitted transactions") } - - switch meta.Status { - case status: - return nil - case pbm.StatusError: - return errors.Errorf("cluster failed: %s", meta.Error) + if len(uncomm) > 0 { + log.Info("uncommitted txns %d", len(uncomm)) } - case <-tout.C: - return errConvergeTimeOut - case <-cn.Context().Done(): - return nil + stat.Partial = len(partial) + stat.LeftUncommitted = len(uncomm) } } + log.Info("oplog replay finished on %v", lts) + + return partial, nil +} + +func replayChunk(file string, oplog *oplog.OplogRestore, stg storage.Storage, c compress.CompressionType) (lts primitive.Timestamp, err error) { + or, err := stg.SourceReader(file) + if err != nil { + return lts, errors.Wrapf(err, "get object %s form the storage", file) + } + defer or.Close() + + oplogReader, err := compress.Decompress(or, c) + if err != nil { + return lts, errors.Wrapf(err, "decompress object %s", file) + } + defer oplogReader.Close() + + lts, err = oplog.Apply(oplogReader) + + return lts, errors.Wrap(err, "apply oplog for chunk") } diff --git a/pbm/rsync.go b/pbm/rsync.go index 967344923..1ff3e98ed 100644 --- a/pbm/rsync.go +++ b/pbm/rsync.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" + "github.com/mongodb/mongo-tools/common/db" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" @@ -18,7 +19,6 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/archive" "github.com/percona/percona-backup-mongodb/pbm/log" "github.com/percona/percona-backup-mongodb/pbm/storage" - "github.com/percona/percona-backup-mongodb/pbm/storage/s3" "github.com/percona/percona-backup-mongodb/version" ) @@ -325,6 +325,27 @@ func ParsePhysRestoreStatus(restore string, stg storage.Storage, l *log.Event) ( rs.nodes[nName] = node case "rs": + if p[1] == "txn" { + continue + } + if p[1] == "partTxn" { + src, err := stg.SourceReader(filepath.Join(PhysRestoresDir, restore, f.Name)) + if err != nil { + l.Error("get partial txn file %s: %v", f.Name, err) + break + } + + ops := []db.Oplog{} + err = json.NewDecoder(src).Decode(&ops) + if err != nil { + l.Error("unmarshal partial txn %s: %v", f.Name, err) + break + } + rs.rs.PartialTxn = append(rs.rs.PartialTxn, ops...) + rss[rsName] = rs + continue + } + cond, err := parsePhysRestoreCond(stg, f.Name, restore) if err != nil { return nil, err @@ -345,21 +366,26 @@ func ParsePhysRestoreStatus(restore string, stg storage.Storage, l *log.Event) ( break } if meta.Stat == nil { - meta.Stat = &RestoreStat{Download: make(map[string]map[string]s3.DownloadStat)} + meta.Stat = &RestoreStat{RS: make(map[string]map[string]RestoreRSMetrics)} } - st := struct { - D s3.DownloadStat `json:"d"` - }{} + st := RestoreShardStat{} err = json.NewDecoder(src).Decode(&st) if err != nil { l.Error("unmarshal stat file %s: %v", f.Name, err) break } - if _, ok := meta.Stat.Download[rsName]; !ok { - meta.Stat.Download[rsName] = make(map[string]s3.DownloadStat) + if _, ok := meta.Stat.RS[rsName]; !ok { + meta.Stat.RS[rsName] = make(map[string]RestoreRSMetrics) } nName := strings.Join(p[1:], ".") - meta.Stat.Download[rsName][nName] = st.D + lstat := meta.Stat.RS[rsName][nName] + lstat.DistTxn.Partial += st.Txn.Partial + lstat.DistTxn.ShardUncommitted += st.Txn.ShardUncommitted + lstat.DistTxn.LeftUncommitted += st.Txn.LeftUncommitted + if st.D != nil { + lstat.Download = *st.D + } + meta.Stat.RS[rsName][nName] = lstat } rss[rsName] = rs diff --git a/pbm/storage/azure/azure.go b/pbm/storage/azure/azure.go index 9c2d19c3b..27c508905 100644 --- a/pbm/storage/azure/azure.go +++ b/pbm/storage/azure/azure.go @@ -201,6 +201,9 @@ func (b *Blob) Copy(src, dst string) error { func (b *Blob) SourceReader(name string) (io.ReadCloser, error) { o, err := b.c.DownloadStream(context.TODO(), b.opts.Container, path.Join(b.opts.Prefix, name), nil) if err != nil { + if isNotFound(err) { + return nil, storage.ErrNotExist + } return nil, errors.Wrap(err, "download object") } diff --git a/pbm/storage/fs/fs.go b/pbm/storage/fs/fs.go index 71d95ed78..cc3b8c20e 100644 --- a/pbm/storage/fs/fs.go +++ b/pbm/storage/fs/fs.go @@ -67,6 +67,9 @@ func (fs *FS) Save(name string, data io.Reader, _ int64) error { func (fs *FS) SourceReader(name string) (io.ReadCloser, error) { filepath := path.Join(fs.opts.Path, name) fr, err := os.Open(filepath) + if errors.Is(err, os.ErrNotExist) { + return nil, storage.ErrNotExist + } return fr, errors.Wrapf(err, "open file '%s'", filepath) }