Skip to content

Commit

Permalink
PBM-815: physical restore + logical PITR (#844)
Browse files Browse the repository at this point in the history
Add an oplog reply stage during the restore data post-processing. All these steps are done whilst the PSMDB cluster is down and isn’t accessible from the outside world. 

We cannot start a full replica set at this stage without exposing it to external users. So oplog reply is done only on the “pimary” node in the single-node replicates state. The data will be propagated to the rest of the nodes during the cluster start. This leads to:
    1. PITR for physical backup happens only to the primary node and later on is copied during cluster start (cluster initial sync)
    2. PITR for sharded collections works only for writes, not for the creation of sharded collections, therefore **whenever a sharded collection is created a full physical backup is needed**. With the logical backup, it’s covered as a full replica set is restored rather than one node in case of physical.

`--base-snashot` always flag must be used with the physical base. For example `pbm restore --time=2023-07-03T16:23:56 -w --base-snapshot=2023-07-03T16:18:09Z`. Without `--base-snapshot` will always look for a logical backup even if there is no logical or a physical one more recent.

Distributed transactions
=========
The old way on sync all dist transactions between the shards won't work due to no DB available during the physical restore. Hence it would be way too slow to do such a sync over the remote storage.
The new algorithm (changed in logical restores as well in a case of code consistency):

By looking at just transactions in the oplog we can't tell which shards
were participating in it. But we can assume that if there is
commitTransaction at least on one shard then the transaction is commited
everywhere. Otherwise, transactions won't be in the oplog or everywhere
would be transactionAbort. So we just treat distributed as
non-distributed - apply opps once a commit message for this txn is
encountered.
It might happen that by the end of the oplog there are some distributed txns
without commit messages. We should commit such transactions only if the data is
full (all prepared statements observed) and this txn was committed at least by
one other shard. For that, each shard saves the last 100 dist transactions
that were committed, so other shards can check if they should commit their
leftovers. We store the last 100, as prepared statements and commits might be
separated by other oplog events so it might happen that several commit messages
can be cut away on some shards but present on other(s). Given oplog events of
dist txns are more or less aligned in [cluster]time, checking the last 100
should be more than enough.
If the transaction is more than 16Mb it will be split into several prepared
messages. So it might happen that one shard committed the txn but another has
observed not all prepared messages by the end of the oplog. In such a case we
should report it in logs and describe-restore.

It also adds some restore stat for dist transactions (into the restore metadata):
`partial` - the num of transactions that were allied on other shards
but can't be applied on this one since not all prepare messages got into
the oplog (shouldn't happen).
`shard_uncommitted` - the number of uncommitted transactions before the sync.
Basically, the transaction is full but no commit message in the oplog of this
shard.
`left_uncommitted` - the num of transactions that remain uncommitted after the sync.
The transaction is full but no commit message in the oplog of any shard.
  • Loading branch information
dAdAbird committed Jul 5, 2023
1 parent 888de63 commit 3f5ccc4
Show file tree
Hide file tree
Showing 17 changed files with 1,038 additions and 857 deletions.
4 changes: 1 addition & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ func (a *Agent) Start() error {
a.OplogReplay(cmd.Replay, cmd.OPID, ep)
case pbm.CmdResync:
a.Resync(cmd.OPID, ep)
case pbm.CmdPITRestore:
a.PITRestore(cmd.PITRestore, cmd.OPID, ep)
case pbm.CmdDeleteBackup:
a.Delete(cmd.Delete, cmd.OPID, ep)
case pbm.CmdDeletePITR:
Expand Down Expand Up @@ -447,7 +445,7 @@ func (a *Agent) acquireLock(l *pbm.Lock, lg *log.Event, acquireFn lockAquireFn)
switch lk.Type {
case pbm.CmdBackup:
fn = a.pbm.MarkBcpStale
case pbm.CmdRestore, pbm.CmdPITRestore:
case pbm.CmdRestore:
fn = a.pbm.MarkRestoreStale
default:
return acquireFn()
Expand Down
170 changes: 0 additions & 170 deletions agent/snapshot.go → agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/percona/percona-backup-mongodb/pbm"
"github.com/percona/percona-backup-mongodb/pbm/backup"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/restore"
"github.com/percona/percona-backup-mongodb/pbm/storage"
)

type currentBackup struct {
Expand Down Expand Up @@ -285,171 +283,3 @@ func (a *Agent) waitNomination(bcp, rs, node string, l *log.Event) (got bool, er
}
}
}

func (a *Agent) Restore(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
if r == nil {
l := a.log.NewEvent(string(pbm.CmdRestore), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := a.log.NewEvent(string(pbm.CmdRestore), r.Name, opid.String(), ep.TS())

var bcpType pbm.BackupType

if r.External && r.BackupName == "" {
bcpType = pbm.ExternalBackup
} else {
l.Info("backup: %s", r.BackupName)
var stg storage.Storage
bcp, err := a.pbm.GetBackupMeta(r.BackupName)
if errors.Is(err, pbm.ErrNotFound) {
stg, err = a.pbm.GetStorage(l)
if err != nil {
l.Error("get storage: %v", err)
return
}

bcp, err = restore.GetMetaFromStore(stg, r.BackupName)
}
if err != nil {
l.Error("get backup metadata: %v", err)
return
}
bcpType = bcp.Type
}
var err error
switch bcpType {
case pbm.PhysicalBackup, pbm.IncrementalBackup, pbm.ExternalBackup:
err = a.restorePhysical(r, opid, ep, l)
case pbm.LogicalBackup:
fallthrough
default:
err = a.restoreLogical(r, opid, ep, l)
}

if err != nil {
l.Error("%v", err)
return
}
}

// restoreLogical starts the restore
func (a *Agent) restoreLogical(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch, l *log.Event) error {
nodeInfo, err := a.node.GetInfo()
if err != nil {
return errors.Wrap(err, "get node info")
}
if !nodeInfo.IsPrimary {
return errors.New("node is not primary so it's unsuitable to do restore")
}

epts := ep.TS()
lock := a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(lock, l, nil)
if err != nil {
return errors.Wrap(err, "acquiring lock")
}
if !got {
l.Debug("skip: lock not acquired")
return errors.New("unbale to run the restore while another operation running")
}

defer func() {
l.Debug("releasing lock")
err := lock.Release()
if err != nil {
l.Error("release lock: %v", err)
}
}()

l.Info("restore started")
err = restore.New(a.pbm, a.node, r.RSMap).Snapshot(r, opid, l)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no data for the shard in backup, skipping")
return nil
}

return err
}
l.Info("restore finished successfully")

if nodeInfo.IsLeader() {
epch, err := a.pbm.ResetEpoch()
if err != nil {
return errors.Wrap(err, "reset epoch")
}

l.Debug("epoch set to %v", epch)
}

return nil
}

// restoreLogical starts the restore
func (a *Agent) restorePhysical(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch, l *log.Event) error {
nodeInfo, err := a.node.GetInfo()
if err != nil {
return errors.Wrap(err, "get node info")
}

rstr, err := restore.NewPhysical(a.pbm, a.node, nodeInfo, r.RSMap)
if err != nil {
return errors.Wrap(err, "init physical backup")
}

// physical restore runs on all nodes in the replset
// so we try lock only on primary only to be sure there
// is no concurrent operation running.
var lock *pbm.Lock
if nodeInfo.IsPrimary {
epts := ep.TS()
lock = a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(lock, l, nil)
if err != nil {
return errors.Wrap(err, "acquiring lock")
}
if !got {
l.Debug("skip: lock not acquired")
return errors.New("unbale to run the restore while another operation running")
}
}

if lock != nil {
// Don't care about errors. Anyway, the lock gonna disappear after the
// restore. And the commands stream is down as well.
// The lock also updates its heartbeats but Restore waits only for one state
// with the timeout twice as short pbm.StaleFrameSec.
lock.Release()
}

l.Info("restore started")
err = rstr.Snapshot(r, opid, l, a.closeCMD, a.HbPause)
l.Info("restore finished %v", err)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no data for the shard in backup, skipping")
return nil
}

return err
}
l.Info("restore finished successfully")

return nil
}
126 changes: 92 additions & 34 deletions agent/pitr.go → agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,57 +258,116 @@ func (a *Agent) pitrLockCheck() (moveOn bool, err error) {
return tl.Heartbeat.T+pbm.StaleFrameSec < ts.T, nil
}

// PITRestore starts the point-in-time recovery
func (a *Agent) PITRestore(r *pbm.PITRestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
func (a *Agent) Restore(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
if r == nil {
l := a.log.NewEvent(string(pbm.CmdPITRestore), "", opid.String(), ep.TS())
l := a.log.NewEvent(string(pbm.CmdRestore), "", opid.String(), ep.TS())
l.Error("missed command")
return
}

l := a.log.NewEvent(string(pbm.CmdPITRestore), r.Name, opid.String(), ep.TS())
l := a.log.NewEvent(string(pbm.CmdRestore), r.Name, opid.String(), ep.TS())

l.Info("to time: %s", time.Unix(r.TS, 0).UTC().Format(time.RFC3339))
if !r.OplogTS.IsZero() {
l.Info("to time: %s", time.Unix(int64(r.OplogTS.T), 0).UTC().Format(time.RFC3339))
}

nodeInfo, err := a.node.GetInfo()
if err != nil {
l.Error("get node info: %v", err)
return
}
if !nodeInfo.IsPrimary {
l.Info("Node in not suitable for restore")
return
}

epts := ep.TS()
lock := a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdPITRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})
var lock *pbm.Lock
if nodeInfo.IsPrimary {
epts := ep.TS()
lock = a.pbm.NewLock(pbm.LockHeader{
Type: pbm.CmdRestore,
Replset: nodeInfo.SetName,
Node: nodeInfo.Me,
OPID: opid.String(),
Epoch: &epts,
})

got, err := a.acquireLock(lock, l, nil)
if err != nil {
l.Error("acquiring lock: %v", err)
return
got, err := a.acquireLock(lock, l, nil)
if err != nil {
l.Error("acquiring lock: %v", err)
return
}
if !got {
l.Debug("skip: lock not acquired")
l.Error("unable to run the restore while another backup or restore process running")
return
}

defer func() {
if lock == nil {
return
}
err := lock.Release()
if err != nil {
l.Error("release lock: %v", err)
}
}()
}
if !got {
l.Debug("skip: lock not acquired")
l.Error("unable to run the restore while another backup or restore process running")

stg, err := a.pbm.GetStorage(l)
if err != nil {
l.Error("get storage: %v", err)
return
}

defer func() {
err := lock.Release()
var bcpType pbm.BackupType
bcp := &pbm.BackupMeta{}

if r.External && r.BackupName == "" {
bcpType = pbm.ExternalBackup
} else {
l.Info("backup: %s", r.BackupName)
if !r.OplogTS.IsZero() {
bcp, err = restore.GetBaseBackup(a.pbm, r.BackupName, r.OplogTS, stg)
} else {
bcp, err = restore.SnapshotMeta(a.pbm, r.BackupName, stg)
}
if err != nil {
l.Error("release lock: %v", err)
l.Error("define base backup: %v", err)
return
}
}()
bcpType = bcp.Type
}

l.Info("recovery started")
err = restore.New(a.pbm, a.node, r.RSMap).PITR(r, opid, l)

switch bcpType {
case pbm.LogicalBackup:
if !nodeInfo.IsPrimary {
l.Info("Node in not suitable for restore")
return
}
if r.OplogTS.IsZero() {
err = restore.New(a.pbm, a.node, r.RSMap).Snapshot(r, opid, l)
} else {
err = restore.New(a.pbm, a.node, r.RSMap).PITR(r, opid, l)
}
case pbm.PhysicalBackup, pbm.IncrementalBackup, pbm.ExternalBackup:
if lock != nil {
// Don't care about errors. Anyway, the lock gonna disappear after the
// restore. And the commands stream is down as well.
// The lock also updates its heartbeats but Restore waits only for one state
// with the timeout twice as short pbm.StaleFrameSec.
_ = lock.Release()
lock = nil
}

var rstr *restore.PhysRestore
rstr, err = restore.NewPhysical(a.pbm, a.node, nodeInfo, r.RSMap)
if err != nil {
l.Error("init physical backup: %v", err)
return
}

r.BackupName = bcp.Name
err = rstr.Snapshot(r, r.OplogTS, opid, l, a.closeCMD, a.HbPause)
}
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
l.Info("no data for the shard in backup, skipping")
Expand All @@ -317,15 +376,14 @@ func (a *Agent) PITRestore(r *pbm.PITRestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
}
return
}
l.Info("recovery successfully finished")

if nodeInfo.IsLeader() {
if bcpType == pbm.LogicalBackup && nodeInfo.IsLeader() {
epch, err := a.pbm.ResetEpoch()
if err != nil {
l.Error("reset epoch")
return
l.Error("reset epoch: %v", err)
}

l.Debug("epoch set to %v", epch)
}

l.Info("recovery successfully finished")
}
2 changes: 1 addition & 1 deletion cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func Main() {
logsCmd.Flag("tail", "Show last N entries, 20 entries are shown by default, 0 for all logs").Short('t').Default("20").Int64Var(&logs.tail)
logsCmd.Flag("node", "Target node in format replset[/host:posrt]").Short('n').StringVar(&logs.node)
logsCmd.Flag("severity", "Severity level D, I, W, E or F, low to high. Choosing one includes higher levels too.").Short('s').Default("I").EnumVar(&logs.severity, "D", "I", "W", "E", "F")
logsCmd.Flag("event", "Event in format backup[/2020-10-06T11:45:14Z]. Events: backup, restore, cancelBackup, resync, pitr, pitrestore, delete").Short('e').StringVar(&logs.event)
logsCmd.Flag("event", "Event in format backup[/2020-10-06T11:45:14Z]. Events: backup, restore, cancelBackup, resync, pitr, delete").Short('e').StringVar(&logs.event)
logsCmd.Flag("opid", "Operation ID").Short('i').StringVar(&logs.opid)
logsCmd.Flag("timezone", "Timezone of log output. `Local`, `UTC` or a location name corresponding to a file in the IANA Time Zone database, such as `America/New_York`").StringVar(&logs.location)
logsCmd.Flag("extra", "Show extra data in text format").Hidden().Short('x').BoolVar(&logs.extr)
Expand Down
Loading

0 comments on commit 3f5ccc4

Please sign in to comment.