Skip to content

Commit

Permalink
PBM-815: remove unsued code
Browse files Browse the repository at this point in the history
  • Loading branch information
dAdAbird committed Jul 5, 2023
1 parent 6e43e43 commit 9528b0c
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 199 deletions.
2 changes: 1 addition & 1 deletion agent/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (a *Agent) Restore(r *pbm.RestoreCmd, opid pbm.OPID, ep pbm.Epoch) {
// restore. And the commands stream is down as well.
// The lock also updates its heartbeats but Restore waits only for one state
// with the timeout twice as short pbm.StaleFrameSec.
lock.Release()
_ = lock.Release()
lock = nil
}

Expand Down
51 changes: 0 additions & 51 deletions cli/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package cli

import (
"bufio"
"context"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/percona/percona-backup-mongodb/pbm"
)
Expand Down Expand Up @@ -347,50 +343,3 @@ func askCleanupConfirmation(info pbm.CleanupInfo) (bool, error) {

return false, nil
}

func findAdjustedTS(ctx context.Context, m *mongo.Client, ts primitive.Timestamp, strict bool) (primitive.Timestamp, error) {
if strict {
rt, err := findBaseSnapshotRestoreTS(ctx, m, bson.M{"$gte": ts}, bson.D{{"last_write_ts", 1}})
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
err = nil
}
return primitive.Timestamp{}, err
}
if rt.Equal(ts) {
return ts, nil
}
}

rt, err := findBaseSnapshotRestoreTS(ctx, m, bson.M{"$lt": ts}, bson.D{{"last_write_ts", -1}})
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return ts, nil
}
return primitive.Timestamp{}, err
}

return rt, nil
}

func findBaseSnapshotRestoreTS(ctx context.Context, m *mongo.Client, lastWrite any, sort bson.D) (primitive.Timestamp, error) {
filter := bson.D{
{"nss", nil},
{"status", pbm.StatusDone},
{"type", pbm.LogicalBackup},
{"last_write_ts", lastWrite},
}
options := options.FindOne().
SetProjection(bson.D{{"last_write_ts", 1}}).
SetSort(sort)
cur := m.Database(pbm.DB).Collection(pbm.BcpCollection).FindOne(ctx, filter, options)
if err := cur.Err(); err != nil {
return primitive.Timestamp{}, errors.WithMessage(err, "query")
}

var r struct {
RestoreTS primitive.Timestamp `bson:"last_write_ts"`
}
err := cur.Decode(&r)
return r.RestoreTS, errors.WithMessage(err, "decode")
}
10 changes: 0 additions & 10 deletions pbm/oplog/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,6 @@ func (o *OplogRestore) handleOp(oe db.Oplog) error {
return nil
}

func isPrepareTxn(op *db.Oplog) bool {
for _, v := range op.Object {
if v.Key == "prepare" {
return true
}
}

return false
}

func isTxnOps(op *db.Oplog) bool {
for _, v := range op.Object {
if v.Key == "applyOps" {
Expand Down
23 changes: 2 additions & 21 deletions pbm/restore/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ type Restore struct {
// empty if all shard names are the same
sMap map[string]string

oplog *oplog.OplogRestore
log *log.Event
opid string
log *log.Event
opid string

indexCatalog *idx.IndexCatalog
}
Expand Down Expand Up @@ -1029,24 +1028,6 @@ func (r *Restore) snapshot(input io.Reader) (err error) {
return err
}

func (r *Restore) replayChunk(file string, c compress.CompressionType) (lts primitive.Timestamp, err error) {
or, err := r.stg.SourceReader(file)
if err != nil {
return lts, errors.Wrapf(err, "get object %s form the storage", file)
}
defer or.Close()

oplogReader, err := compress.Decompress(or, c)
if err != nil {
return lts, errors.Wrapf(err, "decompress object %s", file)
}
defer oplogReader.Close()

lts, err = r.oplog.Apply(oplogReader)

return lts, errors.Wrap(err, "apply oplog for chunk")
}

// Done waits for the replicas to finish the job
// and marks restore as done
func (r *Restore) Done() error {
Expand Down
116 changes: 0 additions & 116 deletions pbm/restore/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/percona/percona-backup-mongodb/pbm"
"github.com/percona/percona-backup-mongodb/pbm/compress"
"github.com/percona/percona-backup-mongodb/pbm/log"
"github.com/percona/percona-backup-mongodb/pbm/oplog"
"github.com/percona/percona-backup-mongodb/pbm/storage"
"github.com/percona/percona-backup-mongodb/pbm/storage/s3"
"github.com/percona/percona-backup-mongodb/version"
Expand Down Expand Up @@ -1545,79 +1544,6 @@ func (r *PhysRestore) agreeCommonRestoreTS() (ts primitive.Timestamp, err error)
return ts, nil
}

func (r *PhysRestore) checkTxn(txn pbm.RestoreTxn) (pbm.TxnState, error) {
shardsToFinish := len(r.syncPathShards)
for f := range r.syncPathShards {
isDone := false
dr, err := r.stg.FileStat(f + "." + string(pbm.StatusDone))
if err != nil && !errors.Is(err, storage.ErrNotExist) {
return pbm.TxnUnknown, errors.Wrapf(err, "check done for <%s>", f)
}
if err == nil && dr.Size != 0 {
isDone = true
}

txnr, err := r.stg.SourceReader(f + ".txn")
if err != nil && errors.Is(err, storage.ErrNotExist) {
if isDone {
shardsToFinish--
}
continue
}
if err != nil {
return pbm.TxnUnknown, errors.Wrapf(err, "get status file <%s>", f)
}

b, err := io.ReadAll(txnr)
if err != nil {
return pbm.TxnUnknown, errors.Wrapf(err, "read status file <%s>", f)
}

t := &pbm.RestoreTxn{}
err = t.Decode(b)
if err != nil {
return pbm.TxnUnknown, errors.Wrapf(err, "decode file <%s>", f)
}

if primitive.CompareTimestamp(t.Ctime, txn.Ctime) == 1 {
shardsToFinish--
continue
}

if t.ID != txn.ID {
if isDone {
shardsToFinish--
continue
}
return pbm.TxnUnknown, nil
}

switch t.State {
case pbm.TxnPrepare:
if isDone {
return pbm.TxnAbort, nil
}
return pbm.TxnUnknown, nil
case pbm.TxnAbort:
return pbm.TxnAbort, nil
case pbm.TxnCommit:
shardsToFinish--
}
}

if shardsToFinish == 0 {
return pbm.TxnCommit, nil
}

return pbm.TxnUnknown, nil
}

func (r *PhysRestore) setTxn(txn pbm.RestoreTxn) error {
return r.stg.Save(r.syncPathRS+".txn",
bytes.NewReader(txn.Encode()), -1,
)
}

func (r *PhysRestore) setcommittedTxn(txn []pbm.RestoreTxn) error {
if txn == nil {
txn = []pbm.RestoreTxn{}
Expand Down Expand Up @@ -1671,48 +1597,6 @@ func (r *PhysRestore) getcommittedTxn() (map[string]primitive.Timestamp, error)
return txn, nil
}

func (r *PhysRestore) checkWaitingTxns(observedTxn map[string]struct{}, o *oplog.OplogRestore) error {
for f := range r.syncPathShards {
txnr, err := r.stg.SourceReader(f + ".txn")
if err != nil && errors.Is(err, storage.ErrNotExist) {
continue
}
if err != nil {
return errors.Wrapf(err, "get status file <%s>", f)
}

b, err := io.ReadAll(txnr)
if err != nil {
return errors.Wrapf(err, "read status file <%s>", f)
}

t := &pbm.RestoreTxn{}
err = t.Decode(b)
if err != nil {
return errors.Wrapf(err, "decode file <%s>", f)
}

if _, ok := observedTxn[t.ID]; t.State == pbm.TxnCommit && !ok {
ts := primitive.Timestamp{o.LastOpTS(), 0}
if primitive.CompareTimestamp(ts, t.Ctime) > 0 {
err := r.setTxn(pbm.RestoreTxn{
ID: "noop",
Ctime: ts,
State: pbm.TxnUnknown,
})
if err != nil {
return errors.Wrap(err, "set current op timestamp")
}
}

observedTxn[t.ID] = struct{}{}
return nil
}
}

return nil
}

// Tries to connect to mongo n times, timeout is applied for each try.
// If a try is unsuccessful, it will check the mongo logs and retry if
// there are no errors or fatals.
Expand Down

0 comments on commit 9528b0c

Please sign in to comment.