Skip to content

Commit

Permalink
Merge pull request #72 from nixys/fix/delivery_errors
Browse files Browse the repository at this point in the history
fix: Delivery backups logic
  • Loading branch information
randreev1321 committed Jul 22, 2024
2 parents 2acb463 + 06af9be commit 658bcce
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
37 changes: 20 additions & 17 deletions interfaces/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,28 @@ func (s Storages) Delivery(logCh chan logger.LogRecord, job Job) error {
errs := new(multierror.Error)

for ofs, dumpObj := range job.GetDumpObjects() {
if !dumpObj.Delivered {
startTime := time.Now()
ok := float64(0)
for _, st := range s {
if err := st.DeliveryBackup(logCh, job.GetName(), dumpObj.TmpFile, ofs, string(job.GetType())); err != nil {
errs = multierror.Append(errs, err)
}
}
if errs.Len() == 0 {
ok = float64(1)
}
if errs.Len() < len(s) {
job.SetDumpObjectDelivered(ofs)
if dumpObj.Delivered {
continue
}
deliveryErrs := new(multierror.Error)
startTime := time.Now()
ok := float64(0)
for _, st := range s {
if err := st.DeliveryBackup(logCh, job.GetName(), dumpObj.TmpFile, ofs, string(job.GetType())); err != nil {
deliveryErrs = multierror.Append(deliveryErrs, err)
}
job.SetOfsMetrics(ofs, map[string]float64{
metrics.DeliveryOk: ok,
metrics.DeliveryTime: float64(time.Since(startTime).Nanoseconds() / 1e6),
})
}
if deliveryErrs.Len() == 0 {
ok = float64(1)
}
job.SetOfsMetrics(ofs, map[string]float64{
metrics.DeliveryOk: ok,
metrics.DeliveryTime: float64(time.Since(startTime).Nanoseconds() / 1e6),
})
if deliveryErrs.Len() < len(s) {
job.SetDumpObjectDelivered(ofs)
}
errs = multierror.Append(errs, deliveryErrs.ErrorOrNil())
}

return errs.ErrorOrNil()
Expand Down
9 changes: 4 additions & 5 deletions modules/storage/ftp/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ func (f *FTP) IsLocal() int { return 0 }
func (f *FTP) DeliveryBackup(logCh chan logger.LogRecord, jobName, tmpBackupFile, ofs string, bakType string) error {
var bakRemPaths, mtdRemPaths []string

if err := f.updateConn(); err != nil {
return err
}

if bakType == string(misc.IncFiles) {
bakRemPaths, mtdRemPaths = GetIncBackupDstList(tmpBackupFile, ofs, f.backupPath)
} else {
Expand Down Expand Up @@ -142,9 +138,12 @@ func (f *FTP) copy(logCh chan logger.LogRecord, job, dst, src string) error {
}
defer func() { _ = srcFile.Close() }()

if err = f.updateConn(); err != nil {
return err
}
err = f.conn.Stor(dst, srcFile)
if err != nil {
logCh <- logger.Log(job, f.name).Errorf("Unable to upload file: %s", err)
logCh <- logger.Log(job, f.name).Errorf("Unable to upload file '%s'. %s", dst, err)
return err
}

Expand Down

0 comments on commit 658bcce

Please sign in to comment.