diff options
author | Laria Carolin Chabowski <laria@laria.me> | 2017-09-27 08:30:51 +0200 |
---|---|---|
committer | Laria Carolin Chabowski <laria@laria.me> | 2017-10-03 15:01:38 +0200 |
commit | fec995ab483a4fc6b2859d199d211c25b6661ff7 (patch) | |
tree | 265536634811dbfba1319143995af912307f6a86 | |
parent | b48f1f2c372d1fa7bca155c449c590d852a03511 (diff) | |
download | petrific-fec995ab483a4fc6b2859d199d211c25b6661ff7.tar.gz petrific-fec995ab483a4fc6b2859d199d211c25b6661ff7.tar.bz2 petrific-fec995ab483a4fc6b2859d199d211c25b6661ff7.zip |
WriteDir in parallel
-rw-r--r-- | backup/backup.go | 129 |
1 files changed, 111 insertions, 18 deletions
diff --git a/backup/backup.go b/backup/backup.go index f578e6f..84fb554 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -6,23 +6,105 @@ import ( "code.laria.me/petrific/objects" "code.laria.me/petrific/storage" "io" + "runtime" + "sort" "time" ) +type filesLast []fs.File + +func (fl filesLast) Len() int { return len(fl) } +func (fl filesLast) Less(i, j int) bool { return fl[i].Type() != fs.FFile && fl[j].Type() == fs.FFile } +func (fl filesLast) Swap(i, j int) { fl[i], fl[j] = fl[j], fl[i] } + +type writeFileTaskResult struct { + file fs.RegularFile + file_id objects.ObjectId + err error +} + +type writeFileTask struct { + file fs.RegularFile + result *chan writeFileTaskResult +} + +func (task writeFileTask) process(store storage.Storage) { + var result writeFileTaskResult + result.file = task.file + + rwc, err := task.file.Open() + if err != nil { + result.err = err + *(task.result) <- result + return + } + defer rwc.Close() + + result.file_id, result.err = WriteFile(store, rwc) + + *(task.result) <- result +} + +type writeDirProcess struct { + queue chan writeFileTask + store storage.Storage +} + +func (proc writeDirProcess) worker() { + for task := range proc.queue { + task.process(proc.store) + } +} + +func (proc writeDirProcess) enqueue(file fs.RegularFile, result *chan writeFileTaskResult) { + go func() { + proc.queue <- writeFileTask{file, result} + }() +} + +func (proc writeDirProcess) stop() { + close(proc.queue) +} + func WriteDir( store storage.Storage, abspath string, d fs.Dir, pcache cache.Cache, ) (objects.ObjectId, error) { - children, err := d.Readdir() + proc := writeDirProcess{ + make(chan writeFileTask), + store, + } + defer proc.stop() + + for i := 0; i < runtime.NumCPU(); i++ { + go proc.worker() + } + + return proc.writeDir(abspath, d, pcache) +} + +func (proc writeDirProcess) writeDir( + abspath string, + d fs.Dir, + pcache cache.Cache, +) (objects.ObjectId, error) { + _children, err := d.Readdir() if err != nil { return objects.ObjectId{}, err } + file_results := make(chan writeFileTaskResult) + wait_for_files := 0 + + // fs.FFile entries must be processed at the end, since they are processed concurrently and might otherwise lock us up + children := filesLast(_children) + sort.Sort(children) + infos := make(objects.Tree) for _, c := range children { - var info objects.TreeEntry + var info objects.TreeEntry = nil switch c.Type() { case fs.FFile: @@ -30,23 +112,13 @@ func WriteDir( if !ok || mtime.Before(c.ModTime()) { // According to cache the file was changed - rwc, err := c.(fs.RegularFile).Open() - if err != nil { - return objects.ObjectId{}, err - } - - file_id, err = WriteFile(store, rwc) - rwc.Close() - if err != nil { - return objects.ObjectId{}, err - } - - pcache.SetPathUpdated(abspath+"/"+c.Name(), c.ModTime(), file_id) + proc.enqueue(c.(fs.RegularFile), &file_results) + wait_for_files++ + } else { + info = objects.NewTreeEntryFile(file_id, c.Executable()) } - - info = objects.NewTreeEntryFile(file_id, c.Executable()) case fs.FDir: - subtree_id, err := WriteDir(store, abspath+"/"+c.Name(), c.(fs.Dir), pcache) + subtree_id, err := proc.writeDir(abspath+"/"+c.Name(), c.(fs.Dir), pcache) if err != nil { return objects.ObjectId{}, err } @@ -66,7 +138,28 @@ func WriteDir( } } - return storage.SetObject(store, objects.ToRawObject(infos)) + for ; wait_for_files > 0; wait_for_files-- { + result := <-file_results + if result.err != nil { + err = result.err + wait_for_files-- + break + } + + pcache.SetPathUpdated(abspath+"/"+result.file.Name(), result.file.ModTime(), result.file_id) + infos[result.file.Name()] = objects.NewTreeEntryFile(result.file_id, result.file.Executable()) + } + + for ; wait_for_files > 0; wait_for_files-- { + // Drain remaining results in case of error + <-file_results + } + + if err != nil { + return objects.ObjectId{}, err + } + + return storage.SetObject(proc.store, objects.ToRawObject(infos)) } const BlobChunkSize = 16 * 1024 * 1024 // 16MB |