aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLaria Carolin Chabowski <laria@laria.me>2017-09-27 08:30:51 +0200
committerLaria Carolin Chabowski <laria@laria.me>2017-10-03 15:01:38 +0200
commitfec995ab483a4fc6b2859d199d211c25b6661ff7 (patch)
tree265536634811dbfba1319143995af912307f6a86
parentb48f1f2c372d1fa7bca155c449c590d852a03511 (diff)
downloadpetrific-fec995ab483a4fc6b2859d199d211c25b6661ff7.tar.gz
petrific-fec995ab483a4fc6b2859d199d211c25b6661ff7.tar.bz2
petrific-fec995ab483a4fc6b2859d199d211c25b6661ff7.zip
WriteDir in parallel
-rw-r--r--backup/backup.go129
1 files changed, 111 insertions, 18 deletions
diff --git a/backup/backup.go b/backup/backup.go
index f578e6f..84fb554 100644
--- a/backup/backup.go
+++ b/backup/backup.go
@@ -6,23 +6,105 @@ import (
"code.laria.me/petrific/objects"
"code.laria.me/petrific/storage"
"io"
+ "runtime"
+ "sort"
"time"
)
+type filesLast []fs.File
+
+func (fl filesLast) Len() int { return len(fl) }
+func (fl filesLast) Less(i, j int) bool { return fl[i].Type() != fs.FFile && fl[j].Type() == fs.FFile }
+func (fl filesLast) Swap(i, j int) { fl[i], fl[j] = fl[j], fl[i] }
+
+type writeFileTaskResult struct {
+ file fs.RegularFile
+ file_id objects.ObjectId
+ err error
+}
+
+type writeFileTask struct {
+ file fs.RegularFile
+ result *chan writeFileTaskResult
+}
+
+func (task writeFileTask) process(store storage.Storage) {
+ var result writeFileTaskResult
+ result.file = task.file
+
+ rwc, err := task.file.Open()
+ if err != nil {
+ result.err = err
+ *(task.result) <- result
+ return
+ }
+ defer rwc.Close()
+
+ result.file_id, result.err = WriteFile(store, rwc)
+
+ *(task.result) <- result
+}
+
+type writeDirProcess struct {
+ queue chan writeFileTask
+ store storage.Storage
+}
+
+func (proc writeDirProcess) worker() {
+ for task := range proc.queue {
+ task.process(proc.store)
+ }
+}
+
+func (proc writeDirProcess) enqueue(file fs.RegularFile, result *chan writeFileTaskResult) {
+ go func() {
+ proc.queue <- writeFileTask{file, result}
+ }()
+}
+
+func (proc writeDirProcess) stop() {
+ close(proc.queue)
+}
+
func WriteDir(
store storage.Storage,
abspath string,
d fs.Dir,
pcache cache.Cache,
) (objects.ObjectId, error) {
- children, err := d.Readdir()
+ proc := writeDirProcess{
+ make(chan writeFileTask),
+ store,
+ }
+ defer proc.stop()
+
+ for i := 0; i < runtime.NumCPU(); i++ {
+ go proc.worker()
+ }
+
+ return proc.writeDir(abspath, d, pcache)
+}
+
+func (proc writeDirProcess) writeDir(
+ abspath string,
+ d fs.Dir,
+ pcache cache.Cache,
+) (objects.ObjectId, error) {
+ _children, err := d.Readdir()
if err != nil {
return objects.ObjectId{}, err
}
+ file_results := make(chan writeFileTaskResult)
+ wait_for_files := 0
+
+ // fs.FFile entries must be processed at the end, since they are processed concurrently and might otherwise lock us up
+ children := filesLast(_children)
+ sort.Sort(children)
+
infos := make(objects.Tree)
for _, c := range children {
- var info objects.TreeEntry
+ var info objects.TreeEntry = nil
switch c.Type() {
case fs.FFile:
@@ -30,23 +112,13 @@ func WriteDir(
if !ok || mtime.Before(c.ModTime()) {
// According to cache the file was changed
- rwc, err := c.(fs.RegularFile).Open()
- if err != nil {
- return objects.ObjectId{}, err
- }
-
- file_id, err = WriteFile(store, rwc)
- rwc.Close()
- if err != nil {
- return objects.ObjectId{}, err
- }
-
- pcache.SetPathUpdated(abspath+"/"+c.Name(), c.ModTime(), file_id)
+ proc.enqueue(c.(fs.RegularFile), &file_results)
+ wait_for_files++
+ } else {
+ info = objects.NewTreeEntryFile(file_id, c.Executable())
}
-
- info = objects.NewTreeEntryFile(file_id, c.Executable())
case fs.FDir:
- subtree_id, err := WriteDir(store, abspath+"/"+c.Name(), c.(fs.Dir), pcache)
+ subtree_id, err := proc.writeDir(abspath+"/"+c.Name(), c.(fs.Dir), pcache)
if err != nil {
return objects.ObjectId{}, err
}
@@ -66,7 +138,28 @@ func WriteDir(
}
}
- return storage.SetObject(store, objects.ToRawObject(infos))
+ for ; wait_for_files > 0; wait_for_files-- {
+ result := <-file_results
+ if result.err != nil {
+ err = result.err
+ wait_for_files--
+ break
+ }
+
+ pcache.SetPathUpdated(abspath+"/"+result.file.Name(), result.file.ModTime(), result.file_id)
+ infos[result.file.Name()] = objects.NewTreeEntryFile(result.file_id, result.file.Executable())
+ }
+
+ for ; wait_for_files > 0; wait_for_files-- {
+ // Drain remaining results in case of error
+ <-file_results
+ }
+
+ if err != nil {
+ return objects.ObjectId{}, err
+ }
+
+ return storage.SetObject(proc.store, objects.ToRawObject(infos))
}
const BlobChunkSize = 16 * 1024 * 1024 // 16MB