Skip to content

Commit

Permalink
Fixed a deadlock problem with very large directory blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
fredli74 committed May 11, 2020
1 parent db08402 commit 3075f02
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 51 deletions.
16 changes: 7 additions & 9 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,13 @@ func (c *Client) StoreBlock(block *HashboxBlock) Byte128 {
if c.closing {
c.dispatchMutex.Unlock()
panic(errors.New("Connection closed"))
} else if c.blockqueuesize+bytearray.ChunkQuantize(int64(block.UncompressedSize))*2 < c.QueueMax {
if c.blockbuffer[block.BlockID] == nil {
c.blockbuffer[block.BlockID] = block
c.blockqueuesize += bytearray.ChunkQuantize(int64(block.UncompressedSize))
} else {
block.Release()
c.dispatchMutex.Unlock()
return block.BlockID
}
} else if c.blockbuffer[block.BlockID] != nil {
block.Release()
c.dispatchMutex.Unlock()
return block.BlockID
} else if c.blockqueuesize == 0 || c.blockqueuesize+bytearray.ChunkQuantize(int64(block.UncompressedSize))*2 < c.QueueMax {
c.blockbuffer[block.BlockID] = block
c.blockqueuesize += bytearray.ChunkQuantize(int64(block.UncompressedSize))
full = false
}
c.dispatchMutex.Unlock()
Expand Down
5 changes: 5 additions & 0 deletions core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
)
Expand Down Expand Up @@ -100,7 +101,11 @@ const (
var LogLevel int = LogInfo
var logMarks []string = []string{"!", "*", ".", "(", "?"}

var LogMutex sync.Mutex

func Log(level int, format string, a ...interface{}) {
LogMutex.Lock()
defer LogMutex.Unlock()
if level <= LogLevel {
fmt.Printf("%s %s "+format+"\n", append([]interface{}{time.Now().UTC().Format(LOGTIMEFORMAT), logMarks[level]}, a...)...)
}
Expand Down
15 changes: 4 additions & 11 deletions hashback/hashback.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const PROGRESS_INTERVAL_SECS = 10 * time.Second
const MAX_BLOCK_SIZE int = 8 * 1024 * 1024 // 8MiB max blocksize
const MIN_BLOCK_SIZE int = 64 * 1024 // 64kb minimum blocksize (before splitting it)

var SendingQueueSize int64 = 48 * 1024 * 1024 // 48 MiB max memory (will use up to CPU*MAX_BLOCK_SIZE more when compressing)
var SendingQueueSize int64 = 48 * 1024 * 1024 // Try to keep queue to 48 MiB (can use more for directory blocks and will use up to an additional CPU*MAX_BLOCK_SIZE when compressing)

// DefaultIgnoreList populated by init() function from each platform
var DefaultIgnoreList []string
Expand All @@ -55,14 +55,6 @@ func PanicOn(err error) {
}
}

func Debug(format string, a ...interface{}) {
if DEBUG {
fmt.Print("DEBUG: ")
fmt.Printf(format, a...)
fmt.Println()
}
}

func SoftError(err error) {
fmt.Println("!!!", err)
}
Expand Down Expand Up @@ -281,7 +273,7 @@ func (session *BackupSession) Connect() *core.Client {
// fmt.Printf("%x\n", *session.AccessKey)
// fmt.Printf("%x\n", *session.BackupKey)

Debug("Connecting to %s", session.ServerString)
core.Log(core.LogTrace, "Connecting to %s", session.ServerString)
client := core.NewClient(session.ServerString, session.User, *session.AccessKey)
client.QueueMax = SendingQueueSize
client.EnablePaint = session.Paint
Expand All @@ -298,7 +290,7 @@ func (session *BackupSession) Close(polite bool) {
session.Client = nil
}

Debug("Disconnected from %s", session.ServerString)
core.Log(core.LogTrace, "Disconnected from %s", session.ServerString)
}

func (session *BackupSession) Log(format string, a ...interface{}) {
Expand Down Expand Up @@ -388,6 +380,7 @@ func main() {
go func() {
core.Log(core.LogInfo, "%v", http.ListenAndServe(":6060", nil))
}()
core.LogLevel = core.LogTrace
}
})

Expand Down
64 changes: 33 additions & 31 deletions hashback/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,26 @@ func minorError(r interface{}) error {

func compareEntries(fileInfo os.FileInfo, new *FileEntry, old *FileEntry) bool {
if old.FileName != new.FileName {
Debug("FileName is different: %s != %s", new.FileName, old.FileName)
core.Log(core.LogTrace, "FileName is different: %s != %s", new.FileName, old.FileName)
return false
}
if old.FileSize != new.FileSize {
Debug("FileSize is different: %d != %d", new.FileSize, old.FileSize)
core.Log(core.LogTrace, "FileSize is different: %d != %d", new.FileSize, old.FileSize)
return false
}
if isOfflineFile(fileInfo) {
if old.ModTime/1e9 != new.ModTime/1e9 {
// compare with second precision because of Dropbox Online Only files
Debug("ModTime is different (OFFLINE FILE): %d != %d", new.ModTime, old.ModTime)
core.Log(core.LogTrace, "ModTime is different (OFFLINE FILE): %d != %d", new.ModTime, old.ModTime)
return false
}
} else {
if old.FileMode != new.FileMode {
Debug("FileMode is different: %d != %d", new.FileMode, old.FileMode)
core.Log(core.LogTrace, "FileMode is different: %d != %d", new.FileMode, old.FileMode)
return false
}
if old.ModTime != new.ModTime {
Debug("ModTime is different: %d != %d", new.ModTime, old.ModTime)
core.Log(core.LogTrace, "ModTime is different: %d != %d", new.ModTime, old.ModTime)
}
}
return true
Expand Down Expand Up @@ -118,7 +118,7 @@ func (session *BackupSession) storeFile(path string, entry *FileEntry) (err erro
defer fileData.Release()

for offset := int64(0); offset < int64(entry.FileSize); {
Debug("storeFile(%s) offset %d", path, offset)
core.Log(core.LogTrace, "storeFile(%s) offset %d", path, offset)

session.PrintStoreProgress(PROGRESS_INTERVAL_SECS)

Expand Down Expand Up @@ -172,7 +172,7 @@ func (session *BackupSession) storeFile(path string, entry *FileEntry) (err erro
}
}

// Split an swap
// Split and swap
right := fileData.Split(splitPosition)
blockData = fileData
fileData = right
Expand All @@ -184,6 +184,8 @@ func (session *BackupSession) storeFile(path string, entry *FileEntry) (err erro
var datakey core.Byte128

id := session.Client.StoreData(core.BlockDataTypeZlib, blockData, nil)
core.Log(core.LogTrace, "split at %d, store %d bytes as %x", offset, blockData.Len(), id)

links = append(links, id)
chain.ChainBlocks = append(chain.ChainBlocks, id)
chain.DecryptKeys = append(chain.DecryptKeys, datakey)
Expand Down Expand Up @@ -257,7 +259,7 @@ func (session *BackupSession) entryFromFileInfo(fileInfo os.FileInfo) *FileEntry
func (session *BackupSession) storePath(path string, toplevel bool) (entry *FileEntry, err error) {
session.PrintStoreProgress(PROGRESS_INTERVAL_SECS)

Debug("storePath %s", path)
core.Log(core.LogDebug, "storePath %s", path)
// Get file info from disk
var info os.FileInfo
{
Expand All @@ -269,7 +271,7 @@ func (session *BackupSession) storePath(path string, toplevel bool) (entry *File
info, err = os.Lstat(path) // At all other levels we do not
}
if info != nil {
Debug("%+v", info)
core.Log(core.LogTrace, "%+v", info)
isDir = info.IsDir() // Check ignore even if we cannot open the file (to avoid output errors on files we already ignore)
}

Expand Down Expand Up @@ -370,7 +372,7 @@ func (session *BackupSession) storePath(path string, toplevel bool) (entry *File
updatedInfo, err = os.Lstat(path) // At all other levels we do not
}
if updatedInfo != nil {
Debug("%+v", updatedInfo)
core.Log(core.LogTrace, "%+v", updatedInfo)
updated := session.entryFromFileInfo(updatedInfo)

if entry.ModTime != updated.ModTime {
Expand Down Expand Up @@ -581,7 +583,7 @@ func (session *BackupSession) Retention(datasetName string, retainDays int, reta
session.Log("Removing backup %s (%s)", date.Format(time.RFC3339), reason)
session.Client.RemoveDatasetState(datasetName, e.State.StateID)
} else {
Debug("Keeping backup %s", date.Format(time.RFC3339))
core.Log(core.LogDebug, "Keeping backup %s", date.Format(time.RFC3339))
lastbackup = timestamp
}
}
Expand Down Expand Up @@ -651,7 +653,7 @@ func (r *referenceEngine) pushChannelEntry(entry *FileEntry) {
}
select {
case <-r.stopChannel:
Debug("Reference loader received stop signal")
core.Log(core.LogDebug, "Reference loader received stop signal")
r.stopped = true
panic(errors.New("Reference loader was stopped"))
case r.entryChannel <- entry:
Expand Down Expand Up @@ -683,14 +685,14 @@ func (r *referenceEngine) loadResumeFile(filename string) {
defer func() {
if !r.stopped {
if r := recover(); r != nil {
Debug("Non-fatal error encountered while resuming backup %s : %v", filename, r)
core.Log(core.LogDebug, "Non-fatal error encountered while resuming backup %s : %v", filename, r)
}
}
}()
cacheRecover, _ := os.Open(filepath.Join(LocalStoragePath, filename))
if cacheRecover != nil {
defer cacheRecover.Close()
Debug("Opened resume cache %s", cacheRecover.Name())
core.Log(core.LogDebug, "Opened resume cache %s", cacheRecover.Name())

info, err := cacheRecover.Stat()
PanicOn(err)
Expand All @@ -701,7 +703,7 @@ func (r *referenceEngine) loadResumeFile(filename string) {
var resumeID core.Byte128
for offset := int64(0); offset < cacheSize; {
var entry FileEntry
Debug("Read cache entry at %x", offset)
core.Log(core.LogTrace, "Read cache entry at %x", offset)
offset += int64(entry.Unserialize(reader))
if resumeID.Compare(entry.ReferenceID) < 0 {
// We're guessing the resume referenceID just to make changedFiles count a little better
Expand All @@ -719,17 +721,17 @@ func (r *referenceEngine) loadResumeFile(filename string) {
if skipcheck > 0 {
skipcheck++
} else if r.session.Client.VerifyBlock(entry.ContentBlockID) {
Debug("Cache entry for %s verified against server", entry.FileName)
core.Log(core.LogTrace, "Cache entry for %s verified against server", entry.FileName)
skipcheck = 1
}
} else if skipcheck > 0 {
Debug("Skipping cache verification for %s as parent is already verified", entry.FileName)
core.Log(core.LogTrace, "Skipping cache verification for %s as parent is already verified", entry.FileName)
} else if !entry.HasContentBlockID() {
Debug("Cache entry for %s has no content to verify", entry.FileName)
core.Log(core.LogTrace, "Cache entry for %s has no content to verify", entry.FileName)
} else if r.session.Client.VerifyBlock(entry.ContentBlockID) {
Debug("Cache entry for %s verified against server", entry.FileName)
core.Log(core.LogTrace, "Cache entry for %s verified against server", entry.FileName)
} else {
Debug("Unable to verify %s against server", entry.FileName)
core.Log(core.LogTrace, "Unable to verify %s against server", entry.FileName)
continue
}

Expand All @@ -753,15 +755,15 @@ func (r *referenceEngine) loader(rootBlockID *core.Byte128) {
defer func() {
if err := recover(); err != nil {
if r.stopped {
Debug("Reference loader stopped gracefully")
core.Log(core.LogDebug, "Reference loader stopped gracefully")
} else {
Debug("Error: Panic raised in reference loader process (%v)", err)
core.Log(core.LogDebug, "Error: Panic raised in reference loader process (%v)", err)

select {
case r.errorChannel <- fmt.Errorf("Panic raised in reference loader process (%v)", err):
Debug("Reference loader sent error on error channel")
core.Log(core.LogDebug, "Reference loader sent error on error channel")
default:
Debug("Reference loader error channel buffer is full, no message sent")
core.Log(core.LogDebug, "Reference loader error channel buffer is full, no message sent")
}
}
}
Expand Down Expand Up @@ -798,20 +800,20 @@ func (r *referenceEngine) loader(rootBlockID *core.Byte128) {
cacheLast, _ := os.Open(r.cacheFilePathName(*rootBlockID))
if cacheLast != nil {
defer cacheLast.Close()
Debug("Opened local cache %s", cacheLast.Name())
core.Log(core.LogDebug, "Opened local cache %s", cacheLast.Name())

info, err := cacheLast.Stat()
PanicOn(err)
cacheSize := info.Size()
reader := bufio.NewReader(cacheLast)
for offset := int64(0); offset < cacheSize; {
var entry FileEntry
Debug("Read cache entry at %x", offset)
core.Log(core.LogTrace, "Read cache entry at %x", offset)
offset += int64(entry.Unserialize(reader))
r.pushChannelEntry(&entry)
}
} else {
Debug("Downloading block %x to local cache", rootBlockID)
core.Log(core.LogTrace, "Downloading block %x to local cache", rootBlockID)
r.downloadReference(*rootBlockID)
}
}
Expand Down Expand Up @@ -876,13 +878,13 @@ func (r *referenceEngine) findReference(path string) *FileEntry {
for {
p := r.peekPath()
if pathLess(p, path) && r.popReference() != nil {
Debug("Reference %s < %s, roll forward", p, path)
core.Log(core.LogTrace, "Reference %s < %s, roll forward", p, path)
continue
} else if p == path {
Debug("Reference %s == %s", p, path)
core.Log(core.LogTrace, "Reference %s == %s", p, path)
return r.popReference()
} else {
Debug("Reference %s > %s", p, path)
core.Log(core.LogTrace, "Reference %s > %s", p, path)
break
}
}
Expand Down Expand Up @@ -963,7 +965,7 @@ func (r *referenceEngine) Close() {
}
})()

Debug("Saving %s as recovery cache %s", currentName, partialName)
core.Log(core.LogDebug, "Saving %s as recovery cache %s", currentName, partialName)
os.Rename(currentName, partialName)
}
}
Expand Down

0 comments on commit 3075f02

Please sign in to comment.