aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--backup/fsck.go294
-rw-r--r--backup/fsck_test.go157
-rw-r--r--fsck.go61
-rw-r--r--main.go1
4 files changed, 513 insertions, 0 deletions
diff --git a/backup/fsck.go b/backup/fsck.go
new file mode 100644
index 0000000..e399d66
--- /dev/null
+++ b/backup/fsck.go
@@ -0,0 +1,294 @@
+package backup
+
+import (
+ "code.laria.me/petrific/logging"
+ "code.laria.me/petrific/objects"
+ "code.laria.me/petrific/storage"
+ "fmt"
+ "runtime"
+ "strings"
+ "sync"
+)
+
+type FsckProblemType int
+
+const (
+ FsckStorageError FsckProblemType = iota
+ FsckDeserializationError
+ FsckUnexpectedBlobSize
+)
+
+type FsckProblem struct {
+ Id objects.ObjectId
+ Ancestors []AncestorInfo
+ ProblemType FsckProblemType
+ Err error
+ WantSize, HaveSize int
+}
+
+func (problem FsckProblem) String() string {
+ desc := ""
+
+ switch problem.ProblemType {
+ case FsckStorageError:
+ desc = fmt.Sprintf("Failed retrieving object from storage: %s", problem.Err)
+ case FsckDeserializationError:
+ desc = fmt.Sprintf("Object could not be deserialized: %s", problem.Err)
+ case FsckUnexpectedBlobSize:
+ desc = fmt.Sprintf("Unexpected blob size: have %d, want %s", problem.HaveSize, problem.WantSize)
+ }
+
+ ancestors := make([]string, len(problem.Ancestors))
+ for i, a := range problem.Ancestors {
+ ancestors[i] = a.String()
+ }
+
+ return fmt.Sprintf("%s. Object %s (path: %s)", desc, problem.Id, strings.Join(ancestors, " / "))
+}
+
+type AncestorInfo struct {
+ Id objects.ObjectId
+ Type objects.ObjectType
+ Name string
+}
+
+func (a AncestorInfo) String() string {
+ if a.Name == "" {
+ return string(a.Type) + " " + a.Id.String()
+ } else {
+ return fmt.Sprintf("%s of %s %s", a.Name, a.Type, a.Id)
+ }
+}
+
+type queueElement struct {
+ Id objects.ObjectId
+ Ancestors []AncestorInfo
+ Extra interface{}
+}
+
+type fsckProcess struct {
+ st storage.Storage
+ blobs bool
+ problems chan<- FsckProblem
+ wait *sync.WaitGroup
+ queue chan queueElement
+ seen map[string]struct{}
+ seenLock sync.Locker
+ log *logging.Log
+}
+
+func (fsck fsckProcess) onlyUnseen(elems []queueElement) []queueElement {
+ fsck.seenLock.Lock()
+ defer fsck.seenLock.Unlock()
+
+ newElems := make([]queueElement, 0, len(elems))
+ for _, elem := range elems {
+ id := elem.Id.String()
+ _, ok := fsck.seen[id]
+ if !ok {
+ newElems = append(newElems, elem)
+
+ fsck.seen[id] = struct{}{}
+ }
+ fsck.log.Debug().Printf("seen %s? %t", id, ok)
+ }
+
+ return newElems
+}
+
+func (fsck fsckProcess) enqueue(elems []queueElement) {
+ fsck.log.Debug().Printf("enqueueing %d elements", len(elems))
+
+ elems = fsck.onlyUnseen(elems)
+
+ fsck.wait.Add(len(elems))
+ go func() {
+ for _, elem := range elems {
+ fsck.log.Debug().Printf("enqueueing %v", elem)
+ fsck.queue <- elem
+ }
+ }()
+}
+
+func (fsck fsckProcess) handle(elem queueElement) {
+ defer fsck.wait.Done()
+
+ rawobj, err := storage.GetObject(fsck.st, elem.Id)
+ if err != nil {
+ fsck.problems <- FsckProblem{
+ Id: elem.Id,
+ Ancestors: elem.Ancestors,
+ ProblemType: FsckStorageError,
+ Err: err,
+ }
+
+ return
+ }
+
+ obj, err := rawobj.Object()
+ if err != nil {
+ fsck.problems <- FsckProblem{
+ Id: elem.Id,
+ Ancestors: elem.Ancestors,
+ ProblemType: FsckDeserializationError,
+ Err: err,
+ }
+
+ return
+ }
+
+ switch obj.Type() {
+ case objects.OTBlob:
+ fsck.handleBlob(elem, obj.(*objects.Blob))
+ case objects.OTFile:
+ fsck.handleFile(elem, obj.(*objects.File))
+ case objects.OTTree:
+ fsck.handleTree(elem, obj.(objects.Tree))
+ case objects.OTSnapshot:
+ fsck.handleSnapshot(elem, obj.(*objects.Snapshot))
+ }
+}
+
+func (fsck fsckProcess) handleBlob(elem queueElement, obj *objects.Blob) {
+ if elem.Extra == nil {
+ return
+ }
+
+ want, ok := elem.Extra.(int)
+ if !ok {
+ return
+ }
+ have := len(*obj)
+
+ if have != want {
+ fsck.problems <- FsckProblem{
+ Id: elem.Id,
+ Ancestors: elem.Ancestors,
+ ProblemType: FsckUnexpectedBlobSize,
+ WantSize: want,
+ HaveSize: have,
+ }
+ }
+}
+
+func (fsck fsckProcess) handleFile(elem queueElement, obj *objects.File) {
+ if !fsck.blobs {
+ return
+ }
+
+ enqueue := make([]queueElement, 0, len(*obj))
+
+ for _, fragment := range *obj {
+ enqueue = append(enqueue, queueElement{
+ Id: fragment.Blob,
+ Ancestors: append(elem.Ancestors, AncestorInfo{
+ Id: elem.Id,
+ Type: objects.OTFile,
+ }),
+ Extra: int(fragment.Size),
+ })
+ }
+
+ fsck.enqueue(enqueue)
+}
+
+func (fsck fsckProcess) handleTree(elem queueElement, obj objects.Tree) {
+ ancestors := func(name string) []AncestorInfo {
+ return append(elem.Ancestors, AncestorInfo{
+ Id: elem.Id,
+ Type: objects.OTTree,
+ Name: name,
+ })
+ }
+
+ enqueue := make([]queueElement, 0, len(obj))
+
+ for name, entry := range obj {
+ switch entry.Type() {
+ case objects.TETDir:
+ enqueue = append(enqueue, queueElement{
+ Id: entry.(objects.TreeEntryDir).Ref,
+ Ancestors: ancestors(name),
+ })
+ case objects.TETFile:
+ enqueue = append(enqueue, queueElement{
+ Id: entry.(objects.TreeEntryFile).Ref,
+ Ancestors: ancestors(name),
+ })
+ }
+ }
+
+ fsck.enqueue(enqueue)
+}
+
+func (fsck fsckProcess) handleSnapshot(elem queueElement, obj *objects.Snapshot) {
+ fsck.enqueue([]queueElement{
+ {Id: obj.Tree},
+ })
+}
+
+func (fsck fsckProcess) worker(i int) {
+ for elem := range fsck.queue {
+ fsck.handle(elem)
+ }
+ fsck.log.Debug().Printf("stopping worker %d", i)
+}
+
+// Fsck checks the consistency of objects in a storage
+func Fsck(
+ st storage.Storage,
+ start *objects.ObjectId,
+ blobs bool,
+ problems chan<- FsckProblem,
+ log *logging.Log,
+) error {
+ proc := fsckProcess{
+ st: st,
+ blobs: blobs,
+ problems: problems,
+ wait: new(sync.WaitGroup),
+ queue: make(chan queueElement),
+ seen: make(map[string]struct{}),
+ seenLock: new(sync.Mutex),
+ log: log,
+ }
+
+ enqueue := []queueElement{}
+
+ if start == nil {
+ types := []objects.ObjectType{
+ objects.OTFile,
+ objects.OTTree,
+ objects.OTSnapshot,
+ }
+ for _, t := range types {
+ ids, err := st.List(t)
+ if err != nil {
+ return err
+ }
+
+ for _, id := range ids {
+ enqueue = append(enqueue, queueElement{Id: id})
+ }
+ }
+ } else {
+ enqueue = []queueElement{
+ {Id: *start},
+ }
+ }
+
+ if len(enqueue) == 0 {
+ return nil
+ }
+
+ for i := 0; i < runtime.NumCPU(); i++ {
+ log.Debug().Printf("starting worker %d", i)
+ go proc.worker(i)
+ }
+
+ proc.enqueue(enqueue)
+
+ proc.wait.Wait()
+ close(proc.queue)
+ return nil
+}
diff --git a/backup/fsck_test.go b/backup/fsck_test.go
new file mode 100644
index 0000000..f42dab3
--- /dev/null
+++ b/backup/fsck_test.go
@@ -0,0 +1,157 @@
+package backup
+
+import (
+ "code.laria.me/petrific/logging"
+ "code.laria.me/petrific/objects"
+ "code.laria.me/petrific/storage/memory"
+ "testing"
+)
+
+func TestHealthy(t *testing.T) {
+ st := memory.NewMemoryStorage()
+ st.Set(objid_emptyfile, objects.OTFile, obj_emptyfile)
+ st.Set(objid_fooblob, objects.OTBlob, obj_fooblob)
+ st.Set(objid_foofile, objects.OTFile, obj_foofile)
+ st.Set(objid_emptytree, objects.OTTree, obj_emptytree)
+ st.Set(objid_subtree, objects.OTTree, obj_subtree)
+ st.Set(objid_testtree, objects.OTTree, obj_testtree)
+
+ problems := make(chan FsckProblem)
+ var err error
+ go func() {
+ err = Fsck(st, nil, true, problems, logging.NewNopLog())
+ close(problems)
+ }()
+
+ for p := range problems {
+ t.Errorf("Unexpected problem: %s", p)
+ }
+
+ if err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+}
+
+var (
+ // snapshot with missing tree object
+ obj_corrupt_snapshot_1 = []byte("" +
+ "snapshot 162\n" +
+ "== BEGIN SNAPSHOT ==\n" +
+ "archive foo\n" +
+ "date 2018-01-06T22:42:00+01:00\n" +
+ "tree sha3-256:f000000000000000000000000000000000000000000000000000000000000000\n" +
+ "== END SNAPSHOT ==\n")
+ objid_corrupt_snapshot_1 = objects.MustParseObjectId("sha3-256:e33ad8ed4ef309099d593d249b36f2a5377dd26aeb18479695763fec514f519e")
+
+ missing_objid_corrupt_snapshot_1 = objects.MustParseObjectId("sha3-256:f000000000000000000000000000000000000000000000000000000000000000")
+
+ obj_corrupt_snapshot_2 = []byte("" +
+ "snapshot 162\n" +
+ "== BEGIN SNAPSHOT ==\n" +
+ "archive foo\n" +
+ "date 2018-01-06T22:45:00+01:00\n" +
+ "tree sha3-256:086f877d9e0760929c0a528ca3a01a7a19c03176a132cc6f4894c69b5943d543\n" +
+ "== END SNAPSHOT ==\n")
+ objid_corrupt_snapshot_2 = objects.MustParseObjectId("sha3-256:d5da78d96bb1bc7bff1f7cee509dba084b54ff4b9af0ed23a6a14437765ac81f")
+
+ obj_corrupt_tree = []byte("tree 431\n" +
+ "name=invalidhash&ref=sha3-256:8888888888888888888888888888888888888888888888888888888888888888&type=file\n" +
+ "name=invalidserialization&ref=sha3-256:7c3c1c331531a80d0e37a6066a6a4e4881eb897f1d76aeffd86a3bd96f0c054f&type=file\n" +
+ "name=lengthmismatch&ref=sha3-256:caea41322f4e02d68a15abe3a867c9ab507674a1f70abc892a162c5b3a742349&type=file\n" +
+ "name=missingobj&ref=sha3-256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff&type=file\n")
+ objid_corrupt_tree = objects.MustParseObjectId("sha3-256:086f877d9e0760929c0a528ca3a01a7a19c03176a132cc6f4894c69b5943d543")
+
+ invalid_objid = objects.MustParseObjectId("sha3-256:8888888888888888888888888888888888888888888888888888888888888888")
+
+ missing_objid_corrupt_tree = objects.MustParseObjectId("sha3-256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
+
+ obj_corrupt_file_invalidhash = obj_emptyfile
+ objid_corrupt_file_invalidhash = objects.MustParseObjectId("sha3-256:8888888888888888888888888888888888888888888888888888888888888888")
+
+ obj_corrupt_file_invalidserialization = []byte("file 9\nsize=123\n")
+ objid_corrupt_file_invalidserialization = objects.MustParseObjectId("sha3-256:7c3c1c331531a80d0e37a6066a6a4e4881eb897f1d76aeffd86a3bd96f0c054f")
+
+ obj_corrupt_blob_lengthmismatch = []byte("blob 2\nx\n")
+ objid_corrupt_blob_lengthmismatch = objects.MustParseObjectId("sha3-256:c9f04ca8fb21c7abb6221060b4e2a332686d0f6be872bdeb85cdc5fe3f2743ca")
+
+ obj_corrupt_file_lengthmismatch = []byte("" +
+ "file 88\n" +
+ "blob=sha3-256:c9f04ca8fb21c7abb6221060b4e2a332686d0f6be872bdeb85cdc5fe3f2743ca&size=100\n")
+ objid_corrupt_file_lengthmismatch = objects.MustParseObjectId("sha3-256:caea41322f4e02d68a15abe3a867c9ab507674a1f70abc892a162c5b3a742349")
+)
+
+func TestCorrupted(t *testing.T) {
+ st := memory.NewMemoryStorage()
+ st.Set(objid_corrupt_snapshot_1, objects.OTSnapshot, obj_corrupt_snapshot_1)
+ st.Set(objid_corrupt_snapshot_2, objects.OTSnapshot, obj_corrupt_snapshot_2)
+ st.Set(objid_corrupt_tree, objects.OTTree, obj_corrupt_tree)
+ st.Set(objid_corrupt_file_invalidhash, objects.OTFile, obj_corrupt_file_invalidhash)
+ st.Set(objid_corrupt_file_invalidserialization, objects.OTFile, obj_corrupt_file_invalidserialization)
+ st.Set(objid_corrupt_blob_lengthmismatch, objects.OTBlob, obj_corrupt_blob_lengthmismatch)
+ st.Set(objid_corrupt_file_lengthmismatch, objects.OTFile, obj_corrupt_file_lengthmismatch)
+
+ problems := make(chan FsckProblem)
+ var err error
+ go func() {
+ err = Fsck(st, nil, true, problems, logging.NewNopLog())
+ close(problems)
+ }()
+
+ var seen_snapshot_1_problem,
+ seen_invalidhash,
+ seen_invalidserialization,
+ seen_lengthmismatch,
+ seen_missing_file bool
+
+ for p := range problems {
+ if p.Id.Equals(missing_objid_corrupt_snapshot_1) && p.ProblemType == FsckStorageError {
+ seen_snapshot_1_problem = true
+ continue
+ }
+
+ if p.Id.Equals(invalid_objid) && p.ProblemType == FsckStorageError {
+ seen_invalidhash = true
+ continue
+ }
+
+ if p.Id.Equals(objid_corrupt_file_invalidserialization) && p.ProblemType == FsckDeserializationError {
+ seen_invalidserialization = true
+ continue
+ }
+
+ if p.Id.Equals(objid_corrupt_blob_lengthmismatch) &&
+ p.ProblemType == FsckUnexpectedBlobSize &&
+ p.HaveSize == 2 && p.WantSize == 100 {
+
+ seen_lengthmismatch = true
+ continue
+ }
+
+ if p.Id.Equals(missing_objid_corrupt_tree) && p.ProblemType == FsckStorageError {
+ seen_missing_file = true
+ continue
+ }
+
+ t.Errorf("Unexpected problem: %s", p)
+ }
+
+ if err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+
+ if !seen_snapshot_1_problem {
+ t.Error("missing problem snapshot_1_problem")
+ }
+ if !seen_invalidhash {
+ t.Error("missing problem invalidhash")
+ }
+ if !seen_invalidserialization {
+ t.Error("missing problem invalidserialization")
+ }
+ if !seen_lengthmismatch {
+ t.Error("missing problem lengthmismatch")
+ }
+ if !seen_missing_file {
+ t.Error("missing problem missing_file")
+ }
+}
diff --git a/fsck.go b/fsck.go
new file mode 100644
index 0000000..b3ef48d
--- /dev/null
+++ b/fsck.go
@@ -0,0 +1,61 @@
+package main
+
+import (
+ "code.laria.me/petrific/backup"
+ "code.laria.me/petrific/objects"
+ "flag"
+ "os"
+)
+
+func Fsck(env *Env, args []string) int {
+ flags := flag.NewFlagSet(os.Args[0]+" fsck", flag.ContinueOnError)
+ full := flags.Bool("full", false, "also check blobs")
+
+ flags.Usage = subcmdUsage("fsck", "[flags] [object-id]", flags)
+ errout := subcmdErrout(env.Log, "fsck")
+
+ if err := flags.Parse(args); err != nil {
+ errout(err)
+ return 2
+ }
+
+ var fsck_id *objects.ObjectId = nil
+
+ if len(flags.Args()) > 1 {
+ id, err := objects.ParseObjectId(flags.Args()[0])
+ if err != nil {
+ env.Log.Error().Printf("Could not parse object id: %s", err)
+ return 1
+ }
+
+ fsck_id = &id
+ }
+
+ env.Log.Debug().Printf("id: %v", fsck_id)
+
+ problems := make(chan backup.FsckProblem)
+ var err error
+ go func() {
+ err = backup.Fsck(env.Store, fsck_id, *full, problems, env.Log)
+ close(problems)
+ }()
+
+ problems_found := false
+ for p := range problems {
+ env.Log.Warn().Print(p)
+ problems_found = true
+ }
+
+ if problems_found {
+ env.Log.Error().Print("Problems found. See warnings in the log")
+ }
+
+ if err != nil {
+ env.Log.Error().Print(err)
+ }
+
+ if err != nil || problems_found {
+ return 1
+ }
+ return 0
+}
diff --git a/main.go b/main.go
index c567403..94a3415 100644
--- a/main.go
+++ b/main.go
@@ -16,6 +16,7 @@ var subcmds = map[string]subcmd{
"create-snapshot": CreateSnapshot,
"list-snapshots": ListSnapshots,
"restore-snapshot": RestoreSnapshot,
+ "fsck": Fsck,
}
func subcmdUsage(name string, usage string, flags *flag.FlagSet) func() {