Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-815: physical restore + logical PITR #844

Merged
merged 23 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ func (a *Agent) Start() error {
a.OplogReplay(cmd.Replay, cmd.OPID, ep)
case pbm.CmdResync:
a.Resync(cmd.OPID, ep)
case pbm.CmdPITRestore:
a.PITRestore(cmd.PITRestore, cmd.OPID, ep)
case pbm.CmdDeleteBackup:
a.Delete(cmd.Delete, cmd.OPID, ep)
case pbm.CmdDeletePITR:
Expand Down Expand Up @@ -447,7 +445,7 @@ func (a *Agent) acquireLock(l *pbm.Lock, lg *log.Event, acquireFn lockAquireFn)
switch lk.Type {
case pbm.CmdBackup:
fn = a.pbm.MarkBcpStale
case pbm.CmdRestore, pbm.CmdPITRestore:
case pbm.CmdRestore:
fn = a.pbm.MarkRestoreStale
default:
return acquireFn()
Expand Down
170 changes: 0 additions & 170 deletions agent/snapshot.go → agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/percona/percona-backup-mongodb/pbm"
"github.com/percona/percona-backup-mongodb/pbm/backup"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/restore"
"github.com/percona/percona-backup-mongodb/pbm/storage"
)

type currentBackup struct {
Expand Down Expand Up @@ -285,171 +283,3 @@ func (a *Agent) waitNomination(bcp, rs, node string, l *log.Event) (got bool, er
}
}
}

func (a *Agent) Restore(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
if r == nil {
l := a.log.NewEvent(string(pbm.CmdRestore), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := a.log.NewEvent(string(pbm.CmdRestore), r.Name, opid.String(), ep.TS())

var bcpType pbm.BackupType

if r.External && r.BackupName == "" {
bcpType = pbm.ExternalBackup
} else {
l.Info("backup: %s", r.BackupName)
var stg storage.Storage
bcp, err := a.pbm.GetBackupMeta(r.BackupName)
if errors.Is(err, pbm.ErrNotFound) {
stg, err = a.pbm.GetStorage(l)
if err != nil {
l.Error("get storage: %v", err)
return
}

bcp, err = restore.GetMetaFromStore(stg, r.BackupName)
}
if err != nil {
l.Error("get backup metadata: %v", err)
return
}
bcpType = bcp.Type
}
var err error
switch bcpType {
case pbm.PhysicalBackup, pbm.IncrementalBackup, pbm.ExternalBackup:
err = a.restorePhysical(r, opid, ep, l)
case pbm.LogicalBackup:
fallthrough
default:
err = a.restoreLogical(r, opid, ep, l)
}

if err != nil {
l.Error("%v", err)
return
}
}

// restoreLogical starts the restore
func (a *Agent) restoreLogical(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch, l *log.Event) error {
nodeInfo, err := a.node.GetInfo()
if err != nil {
return errors.Wrap(err, "get node info")
}
if !nodeInfo.IsPrimary {
return errors.New("node is not primary so it's unsuitable to do restore")
}

epts := ep.TS()
lock := a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(lock, l, nil)
if err != nil {
return errors.Wrap(err, "acquiring lock")
}
if !got {
l.Debug("skip: lock not acquired")
return errors.New("unbale to run the restore while another operation running")
}

defer func() {
l.Debug("releasing lock")
err := lock.Release()
if err != nil {
l.Error("release lock: %v", err)
}
}()

l.Info("restore started")
err = restore.New(a.pbm, a.node, r.RSMap).Snapshot(r, opid, l)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no data for the shard in backup, skipping")
return nil
}

return err
}
l.Info("restore finished successfully")

if nodeInfo.IsLeader() {
epch, err := a.pbm.ResetEpoch()
if err != nil {
return errors.Wrap(err, "reset epoch")
}

l.Debug("epoch set to %v", epch)
}

return nil
}

// restoreLogical starts the restore
func (a *Agent) restorePhysical(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch, l *log.Event) error {
nodeInfo, err := a.node.GetInfo()
if err != nil {
return errors.Wrap(err, "get node info")
}

rstr, err := restore.NewPhysical(a.pbm, a.node, nodeInfo, r.RSMap)
if err != nil {
return errors.Wrap(err, "init physical backup")
}

// physical restore runs on all nodes in the replset
// so we try lock only on primary only to be sure there
// is no concurrent operation running.
var lock *pbm.Lock
if nodeInfo.IsPrimary {
epts := ep.TS()
lock = a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(lock, l, nil)
if err != nil {
return errors.Wrap(err, "acquiring lock")
}
if !got {
l.Debug("skip: lock not acquired")
return errors.New("unbale to run the restore while another operation running")
}
}

if lock != nil {
// Don't care about errors. Anyway, the lock gonna disappear after the
// restore. And the commands stream is down as well.
// The lock also updates its heartbeats but Restore waits only for one state
// with the timeout twice as short pbm.StaleFrameSec.
lock.Release()
}

l.Info("restore started")
err = rstr.Snapshot(r, opid, l, a.closeCMD, a.HbPause)
l.Info("restore finished %v", err)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no data for the shard in backup, skipping")
return nil
}

return err
}
l.Info("restore finished successfully")

return nil
}
126 changes: 92 additions & 34 deletions agent/pitr.go → agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,57 +258,116 @@ func (a *Agent) pitrLockCheck() (moveOn bool, err error) {
return tl.Heartbeat.T+pbm.StaleFrameSec < ts.T, nil
}

// PITRestore starts the point-in-time recovery
func (a *Agent) PITRestore(r *pbm.PITRestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
func (a *Agent) Restore(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
if r == nil {
l := a.log.NewEvent(string(pbm.CmdPITRestore), "", opid.String(), ep.TS())
l := a.log.NewEvent(string(pbm.CmdRestore), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := a.log.NewEvent(string(pbm.CmdPITRestore), r.Name, opid.String(), ep.TS())
l := a.log.NewEvent(string(pbm.CmdRestore), r.Name, opid.String(), ep.TS())

l.Info("to time: %s", time.Unix(r.TS, 0).UTC().Format(time.RFC3339))
if !r.OplogTS.IsZero() {
l.Info("to time: %s", time.Unix(int64(r.OplogTS.T), 0).UTC().Format(time.RFC3339))
}

nodeInfo, err := a.node.GetInfo()
if err != nil {
l.Error("get node info: %v", err)
return
}
if !nodeInfo.IsPrimary {
l.Info("Node in not suitable for restore")
return
}

epts := ep.TS()
lock := a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdPITRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})
var lock *pbm.Lock
if nodeInfo.IsPrimary {
epts := ep.TS()
lock = a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(lock, l, nil)
if err != nil {
l.Error("acquiring lock: %v", err)
return
got, err := a.acquireLock(lock, l, nil)
if err != nil {
l.Error("acquiring lock: %v", err)
return
}
if !got {
l.Debug("skip: lock not acquired")
l.Error("unable to run the restore while another backup or restore process running")
return
}

defer func() {
if lock == nil {
return
}
err := lock.Release()
if err != nil {
l.Error("release lock: %v", err)
}
}()
}
if !got {
l.Debug("skip: lock not acquired")
l.Error("unable to run the restore while another backup or restore process running")

stg, err := a.pbm.GetStorage(l)
if err != nil {
l.Error("get storage: %v", err)
return
}

defer func() {
err := lock.Release()
var bcpType pbm.BackupType
bcp := &pbm.BackupMeta{}

if r.External && r.BackupName == "" {
bcpType = pbm.ExternalBackup
} else {
l.Info("backup: %s", r.BackupName)
if !r.OplogTS.IsZero() {
bcp, err = restore.GetBaseBackup(a.pbm, r.BackupName, r.OplogTS, stg)
} else {
bcp, err = restore.SnapshotMeta(a.pbm, r.BackupName, stg)
}
if err != nil {
l.Error("release lock: %v", err)
l.Error("define base backup: %v", err)
return
}
}()
bcpType = bcp.Type
}

l.Info("recovery started")
err = restore.New(a.pbm, a.node, r.RSMap).PITR(r, opid, l)

switch bcpType {
case pbm.LogicalBackup:
if !nodeInfo.IsPrimary {
l.Info("Node in not suitable for restore")
return
}
if r.OplogTS.IsZero() {
err = restore.New(a.pbm, a.node, r.RSMap).Snapshot(r, opid, l)
} else {
err = restore.New(a.pbm, a.node, r.RSMap).PITR(r, opid, l)
}
case pbm.PhysicalBackup, pbm.IncrementalBackup, pbm.ExternalBackup:
if lock != nil {
// Don't care about errors. Anyway, the lock gonna disappear after the
// restore. And the commands stream is down as well.
// The lock also updates its heartbeats but Restore waits only for one state
// with the timeout twice as short pbm.StaleFrameSec.
_ = lock.Release()
lock = nil
}

var rstr *restore.PhysRestore
rstr, err = restore.NewPhysical(a.pbm, a.node, nodeInfo, r.RSMap)
if err != nil {
l.Error("init physical backup: %v", err)
return
}

r.BackupName = bcp.Name
err = rstr.Snapshot(r, r.OplogTS, opid, l, a.closeCMD, a.HbPause)
}
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no data for the shard in backup, skipping")
Expand All @@ -317,15 +376,14 @@ func (a *Agent) PITRestore(r *pbm.PITRestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
}
return
}
l.Info("recovery successfully finished")

if nodeInfo.IsLeader() {
if bcpType == pbm.LogicalBackup && nodeInfo.IsLeader() {
epch, err := a.pbm.ResetEpoch()
if err != nil {
l.Error("reset epoch")
return
l.Error("reset epoch: %v", err)
}

l.Debug("epoch set to %v", epch)
}

l.Info("recovery successfully finished")
}
2 changes: 1 addition & 1 deletion cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func Main() {
logsCmd.Flag("tail", "Show last N entries, 20 entries are shown by default, 0 for all logs").Short('t').Default("20").Int64Var(&logs.tail)
logsCmd.Flag("node", "Target node in format replset[/host:posrt]").Short('n').StringVar(&logs.node)
logsCmd.Flag("severity", "Severity level D, I, W, E or F, low to high. Choosing one includes higher levels too.").Short('s').Default("I").EnumVar(&logs.severity, "D", "I", "W", "E", "F")
logsCmd.Flag("event", "Event in format backup[/2020-10-06T11:45:14Z]. Events: backup, restore, cancelBackup, resync, pitr, pitrestore, delete").Short('e').StringVar(&logs.event)
logsCmd.Flag("event", "Event in format backup[/2020-10-06T11:45:14Z]. Events: backup, restore, cancelBackup, resync, pitr, delete").Short('e').StringVar(&logs.event)
logsCmd.Flag("opid", "Operation ID").Short('i').StringVar(&logs.opid)
logsCmd.Flag("timezone", "Timezone of log output. `Local`, `UTC` or a location name corresponding to a file in the IANA Time Zone database, such as `America/New_York`").StringVar(&logs.location)
logsCmd.Flag("extra", "Show extra data in text format").Hidden().Short('x').BoolVar(&logs.extr)
Expand Down
Loading
Loading