diff options
Diffstat (limited to 'backup')
-rw-r--r-- | backup/fsck.go | 294 | ||||
-rw-r--r-- | backup/fsck_test.go | 157 |
2 files changed, 451 insertions, 0 deletions
diff --git a/backup/fsck.go b/backup/fsck.go new file mode 100644 index 0000000..e399d66 --- /dev/null +++ b/backup/fsck.go @@ -0,0 +1,294 @@ +package backup + +import ( + "code.laria.me/petrific/logging" + "code.laria.me/petrific/objects" + "code.laria.me/petrific/storage" + "fmt" + "runtime" + "strings" + "sync" +) + +type FsckProblemType int + +const ( + FsckStorageError FsckProblemType = iota + FsckDeserializationError + FsckUnexpectedBlobSize +) + +type FsckProblem struct { + Id objects.ObjectId + Ancestors []AncestorInfo + ProblemType FsckProblemType + Err error + WantSize, HaveSize int +} + +func (problem FsckProblem) String() string { + desc := "" + + switch problem.ProblemType { + case FsckStorageError: + desc = fmt.Sprintf("Failed retrieving object from storage: %s", problem.Err) + case FsckDeserializationError: + desc = fmt.Sprintf("Object could not be deserialized: %s", problem.Err) + case FsckUnexpectedBlobSize: + desc = fmt.Sprintf("Unexpected blob size: have %d, want %s", problem.HaveSize, problem.WantSize) + } + + ancestors := make([]string, len(problem.Ancestors)) + for i, a := range problem.Ancestors { + ancestors[i] = a.String() + } + + return fmt.Sprintf("%s. Object %s (path: %s)", desc, problem.Id, strings.Join(ancestors, " / ")) +} + +type AncestorInfo struct { + Id objects.ObjectId + Type objects.ObjectType + Name string +} + +func (a AncestorInfo) String() string { + if a.Name == "" { + return string(a.Type) + " " + a.Id.String() + } else { + return fmt.Sprintf("%s of %s %s", a.Name, a.Type, a.Id) + } +} + +type queueElement struct { + Id objects.ObjectId + Ancestors []AncestorInfo + Extra interface{} +} + +type fsckProcess struct { + st storage.Storage + blobs bool + problems chan<- FsckProblem + wait *sync.WaitGroup + queue chan queueElement + seen map[string]struct{} + seenLock sync.Locker + log *logging.Log +} + +func (fsck fsckProcess) onlyUnseen(elems []queueElement) []queueElement { + fsck.seenLock.Lock() + defer fsck.seenLock.Unlock() + + newElems := make([]queueElement, 0, len(elems)) + for _, elem := range elems { + id := elem.Id.String() + _, ok := fsck.seen[id] + if !ok { + newElems = append(newElems, elem) + + fsck.seen[id] = struct{}{} + } + fsck.log.Debug().Printf("seen %s? %t", id, ok) + } + + return newElems +} + +func (fsck fsckProcess) enqueue(elems []queueElement) { + fsck.log.Debug().Printf("enqueueing %d elements", len(elems)) + + elems = fsck.onlyUnseen(elems) + + fsck.wait.Add(len(elems)) + go func() { + for _, elem := range elems { + fsck.log.Debug().Printf("enqueueing %v", elem) + fsck.queue <- elem + } + }() +} + +func (fsck fsckProcess) handle(elem queueElement) { + defer fsck.wait.Done() + + rawobj, err := storage.GetObject(fsck.st, elem.Id) + if err != nil { + fsck.problems <- FsckProblem{ + Id: elem.Id, + Ancestors: elem.Ancestors, + ProblemType: FsckStorageError, + Err: err, + } + + return + } + + obj, err := rawobj.Object() + if err != nil { + fsck.problems <- FsckProblem{ + Id: elem.Id, + Ancestors: elem.Ancestors, + ProblemType: FsckDeserializationError, + Err: err, + } + + return + } + + switch obj.Type() { + case objects.OTBlob: + fsck.handleBlob(elem, obj.(*objects.Blob)) + case objects.OTFile: + fsck.handleFile(elem, obj.(*objects.File)) + case objects.OTTree: + fsck.handleTree(elem, obj.(objects.Tree)) + case objects.OTSnapshot: + fsck.handleSnapshot(elem, obj.(*objects.Snapshot)) + } +} + +func (fsck fsckProcess) handleBlob(elem queueElement, obj *objects.Blob) { + if elem.Extra == nil { + return + } + + want, ok := elem.Extra.(int) + if !ok { + return + } + have := len(*obj) + + if have != want { + fsck.problems <- FsckProblem{ + Id: elem.Id, + Ancestors: elem.Ancestors, + ProblemType: FsckUnexpectedBlobSize, + WantSize: want, + HaveSize: have, + } + } +} + +func (fsck fsckProcess) handleFile(elem queueElement, obj *objects.File) { + if !fsck.blobs { + return + } + + enqueue := make([]queueElement, 0, len(*obj)) + + for _, fragment := range *obj { + enqueue = append(enqueue, queueElement{ + Id: fragment.Blob, + Ancestors: append(elem.Ancestors, AncestorInfo{ + Id: elem.Id, + Type: objects.OTFile, + }), + Extra: int(fragment.Size), + }) + } + + fsck.enqueue(enqueue) +} + +func (fsck fsckProcess) handleTree(elem queueElement, obj objects.Tree) { + ancestors := func(name string) []AncestorInfo { + return append(elem.Ancestors, AncestorInfo{ + Id: elem.Id, + Type: objects.OTTree, + Name: name, + }) + } + + enqueue := make([]queueElement, 0, len(obj)) + + for name, entry := range obj { + switch entry.Type() { + case objects.TETDir: + enqueue = append(enqueue, queueElement{ + Id: entry.(objects.TreeEntryDir).Ref, + Ancestors: ancestors(name), + }) + case objects.TETFile: + enqueue = append(enqueue, queueElement{ + Id: entry.(objects.TreeEntryFile).Ref, + Ancestors: ancestors(name), + }) + } + } + + fsck.enqueue(enqueue) +} + +func (fsck fsckProcess) handleSnapshot(elem queueElement, obj *objects.Snapshot) { + fsck.enqueue([]queueElement{ + {Id: obj.Tree}, + }) +} + +func (fsck fsckProcess) worker(i int) { + for elem := range fsck.queue { + fsck.handle(elem) + } + fsck.log.Debug().Printf("stopping worker %d", i) +} + +// Fsck checks the consistency of objects in a storage +func Fsck( + st storage.Storage, + start *objects.ObjectId, + blobs bool, + problems chan<- FsckProblem, + log *logging.Log, +) error { + proc := fsckProcess{ + st: st, + blobs: blobs, + problems: problems, + wait: new(sync.WaitGroup), + queue: make(chan queueElement), + seen: make(map[string]struct{}), + seenLock: new(sync.Mutex), + log: log, + } + + enqueue := []queueElement{} + + if start == nil { + types := []objects.ObjectType{ + objects.OTFile, + objects.OTTree, + objects.OTSnapshot, + } + for _, t := range types { + ids, err := st.List(t) + if err != nil { + return err + } + + for _, id := range ids { + enqueue = append(enqueue, queueElement{Id: id}) + } + } + } else { + enqueue = []queueElement{ + {Id: *start}, + } + } + + if len(enqueue) == 0 { + return nil + } + + for i := 0; i < runtime.NumCPU(); i++ { + log.Debug().Printf("starting worker %d", i) + go proc.worker(i) + } + + proc.enqueue(enqueue) + + proc.wait.Wait() + close(proc.queue) + return nil +} diff --git a/backup/fsck_test.go b/backup/fsck_test.go new file mode 100644 index 0000000..f42dab3 --- /dev/null +++ b/backup/fsck_test.go @@ -0,0 +1,157 @@ +package backup + +import ( + "code.laria.me/petrific/logging" + "code.laria.me/petrific/objects" + "code.laria.me/petrific/storage/memory" + "testing" +) + +func TestHealthy(t *testing.T) { + st := memory.NewMemoryStorage() + st.Set(objid_emptyfile, objects.OTFile, obj_emptyfile) + st.Set(objid_fooblob, objects.OTBlob, obj_fooblob) + st.Set(objid_foofile, objects.OTFile, obj_foofile) + st.Set(objid_emptytree, objects.OTTree, obj_emptytree) + st.Set(objid_subtree, objects.OTTree, obj_subtree) + st.Set(objid_testtree, objects.OTTree, obj_testtree) + + problems := make(chan FsckProblem) + var err error + go func() { + err = Fsck(st, nil, true, problems, logging.NewNopLog()) + close(problems) + }() + + for p := range problems { + t.Errorf("Unexpected problem: %s", p) + } + + if err != nil { + t.Errorf("Unexpected error: %s", err) + } +} + +var ( + // snapshot with missing tree object + obj_corrupt_snapshot_1 = []byte("" + + "snapshot 162\n" + + "== BEGIN SNAPSHOT ==\n" + + "archive foo\n" + + "date 2018-01-06T22:42:00+01:00\n" + + "tree sha3-256:f000000000000000000000000000000000000000000000000000000000000000\n" + + "== END SNAPSHOT ==\n") + objid_corrupt_snapshot_1 = objects.MustParseObjectId("sha3-256:e33ad8ed4ef309099d593d249b36f2a5377dd26aeb18479695763fec514f519e") + + missing_objid_corrupt_snapshot_1 = objects.MustParseObjectId("sha3-256:f000000000000000000000000000000000000000000000000000000000000000") + + obj_corrupt_snapshot_2 = []byte("" + + "snapshot 162\n" + + "== BEGIN SNAPSHOT ==\n" + + "archive foo\n" + + "date 2018-01-06T22:45:00+01:00\n" + + "tree sha3-256:086f877d9e0760929c0a528ca3a01a7a19c03176a132cc6f4894c69b5943d543\n" + + "== END SNAPSHOT ==\n") + objid_corrupt_snapshot_2 = objects.MustParseObjectId("sha3-256:d5da78d96bb1bc7bff1f7cee509dba084b54ff4b9af0ed23a6a14437765ac81f") + + obj_corrupt_tree = []byte("tree 431\n" + + "name=invalidhash&ref=sha3-256:8888888888888888888888888888888888888888888888888888888888888888&type=file\n" + + "name=invalidserialization&ref=sha3-256:7c3c1c331531a80d0e37a6066a6a4e4881eb897f1d76aeffd86a3bd96f0c054f&type=file\n" + + "name=lengthmismatch&ref=sha3-256:caea41322f4e02d68a15abe3a867c9ab507674a1f70abc892a162c5b3a742349&type=file\n" + + "name=missingobj&ref=sha3-256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff&type=file\n") + objid_corrupt_tree = objects.MustParseObjectId("sha3-256:086f877d9e0760929c0a528ca3a01a7a19c03176a132cc6f4894c69b5943d543") + + invalid_objid = objects.MustParseObjectId("sha3-256:8888888888888888888888888888888888888888888888888888888888888888") + + missing_objid_corrupt_tree = objects.MustParseObjectId("sha3-256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + + obj_corrupt_file_invalidhash = obj_emptyfile + objid_corrupt_file_invalidhash = objects.MustParseObjectId("sha3-256:8888888888888888888888888888888888888888888888888888888888888888") + + obj_corrupt_file_invalidserialization = []byte("file 9\nsize=123\n") + objid_corrupt_file_invalidserialization = objects.MustParseObjectId("sha3-256:7c3c1c331531a80d0e37a6066a6a4e4881eb897f1d76aeffd86a3bd96f0c054f") + + obj_corrupt_blob_lengthmismatch = []byte("blob 2\nx\n") + objid_corrupt_blob_lengthmismatch = objects.MustParseObjectId("sha3-256:c9f04ca8fb21c7abb6221060b4e2a332686d0f6be872bdeb85cdc5fe3f2743ca") + + obj_corrupt_file_lengthmismatch = []byte("" + + "file 88\n" + + "blob=sha3-256:c9f04ca8fb21c7abb6221060b4e2a332686d0f6be872bdeb85cdc5fe3f2743ca&size=100\n") + objid_corrupt_file_lengthmismatch = objects.MustParseObjectId("sha3-256:caea41322f4e02d68a15abe3a867c9ab507674a1f70abc892a162c5b3a742349") +) + +func TestCorrupted(t *testing.T) { + st := memory.NewMemoryStorage() + st.Set(objid_corrupt_snapshot_1, objects.OTSnapshot, obj_corrupt_snapshot_1) + st.Set(objid_corrupt_snapshot_2, objects.OTSnapshot, obj_corrupt_snapshot_2) + st.Set(objid_corrupt_tree, objects.OTTree, obj_corrupt_tree) + st.Set(objid_corrupt_file_invalidhash, objects.OTFile, obj_corrupt_file_invalidhash) + st.Set(objid_corrupt_file_invalidserialization, objects.OTFile, obj_corrupt_file_invalidserialization) + st.Set(objid_corrupt_blob_lengthmismatch, objects.OTBlob, obj_corrupt_blob_lengthmismatch) + st.Set(objid_corrupt_file_lengthmismatch, objects.OTFile, obj_corrupt_file_lengthmismatch) + + problems := make(chan FsckProblem) + var err error + go func() { + err = Fsck(st, nil, true, problems, logging.NewNopLog()) + close(problems) + }() + + var seen_snapshot_1_problem, + seen_invalidhash, + seen_invalidserialization, + seen_lengthmismatch, + seen_missing_file bool + + for p := range problems { + if p.Id.Equals(missing_objid_corrupt_snapshot_1) && p.ProblemType == FsckStorageError { + seen_snapshot_1_problem = true + continue + } + + if p.Id.Equals(invalid_objid) && p.ProblemType == FsckStorageError { + seen_invalidhash = true + continue + } + + if p.Id.Equals(objid_corrupt_file_invalidserialization) && p.ProblemType == FsckDeserializationError { + seen_invalidserialization = true + continue + } + + if p.Id.Equals(objid_corrupt_blob_lengthmismatch) && + p.ProblemType == FsckUnexpectedBlobSize && + p.HaveSize == 2 && p.WantSize == 100 { + + seen_lengthmismatch = true + continue + } + + if p.Id.Equals(missing_objid_corrupt_tree) && p.ProblemType == FsckStorageError { + seen_missing_file = true + continue + } + + t.Errorf("Unexpected problem: %s", p) + } + + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + + if !seen_snapshot_1_problem { + t.Error("missing problem snapshot_1_problem") + } + if !seen_invalidhash { + t.Error("missing problem invalidhash") + } + if !seen_invalidserialization { + t.Error("missing problem invalidserialization") + } + if !seen_lengthmismatch { + t.Error("missing problem lengthmismatch") + } + if !seen_missing_file { + t.Error("missing problem missing_file") + } +} |