From 11e13d63c0f3d0763c78d89dedc7adaa80f654db Mon Sep 17 00:00:00 2001
From: Laria Carolin Chabowski <laria@laria.me>
Date: Sun, 7 Jan 2018 00:20:12 +0100
Subject: Add fsck command

---
 backup/fsck.go      | 294 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 backup/fsck_test.go | 157 ++++++++++++++++++++++++++++
 fsck.go             |  61 +++++++++++
 main.go             |   1 +
 4 files changed, 513 insertions(+)
 create mode 100644 backup/fsck.go
 create mode 100644 backup/fsck_test.go
 create mode 100644 fsck.go

diff --git a/backup/fsck.go b/backup/fsck.go
new file mode 100644
index 0000000..e399d66
--- /dev/null
+++ b/backup/fsck.go
@@ -0,0 +1,294 @@
+package backup
+
+import (
+	"code.laria.me/petrific/logging"
+	"code.laria.me/petrific/objects"
+	"code.laria.me/petrific/storage"
+	"fmt"
+	"runtime"
+	"strings"
+	"sync"
+)
+
+type FsckProblemType int
+
+const (
+	FsckStorageError FsckProblemType = iota
+	FsckDeserializationError
+	FsckUnexpectedBlobSize
+)
+
+type FsckProblem struct {
+	Id                 objects.ObjectId
+	Ancestors          []AncestorInfo
+	ProblemType        FsckProblemType
+	Err                error
+	WantSize, HaveSize int
+}
+
+func (problem FsckProblem) String() string {
+	desc := ""
+
+	switch problem.ProblemType {
+	case FsckStorageError:
+		desc = fmt.Sprintf("Failed retrieving object from storage: %s", problem.Err)
+	case FsckDeserializationError:
+		desc = fmt.Sprintf("Object could not be deserialized: %s", problem.Err)
+	case FsckUnexpectedBlobSize:
+		desc = fmt.Sprintf("Unexpected blob size: have %d, want %s", problem.HaveSize, problem.WantSize)
+	}
+
+	ancestors := make([]string, len(problem.Ancestors))
+	for i, a := range problem.Ancestors {
+		ancestors[i] = a.String()
+	}
+
+	return fmt.Sprintf("%s. Object %s (path: %s)", desc, problem.Id, strings.Join(ancestors, " / "))
+}
+
+type AncestorInfo struct {
+	Id   objects.ObjectId
+	Type objects.ObjectType
+	Name string
+}
+
+func (a AncestorInfo) String() string {
+	if a.Name == "" {
+		return string(a.Type) + " " + a.Id.String()
+	} else {
+		return fmt.Sprintf("%s of %s %s", a.Name, a.Type, a.Id)
+	}
+}
+
+type queueElement struct {
+	Id        objects.ObjectId
+	Ancestors []AncestorInfo
+	Extra     interface{}
+}
+
+type fsckProcess struct {
+	st       storage.Storage
+	blobs    bool
+	problems chan<- FsckProblem
+	wait     *sync.WaitGroup
+	queue    chan queueElement
+	seen     map[string]struct{}
+	seenLock sync.Locker
+	log      *logging.Log
+}
+
+func (fsck fsckProcess) onlyUnseen(elems []queueElement) []queueElement {
+	fsck.seenLock.Lock()
+	defer fsck.seenLock.Unlock()
+
+	newElems := make([]queueElement, 0, len(elems))
+	for _, elem := range elems {
+		id := elem.Id.String()
+		_, ok := fsck.seen[id]
+		if !ok {
+			newElems = append(newElems, elem)
+
+			fsck.seen[id] = struct{}{}
+		}
+		fsck.log.Debug().Printf("seen %s? %t", id, ok)
+	}
+
+	return newElems
+}
+
+func (fsck fsckProcess) enqueue(elems []queueElement) {
+	fsck.log.Debug().Printf("enqueueing %d elements", len(elems))
+
+	elems = fsck.onlyUnseen(elems)
+
+	fsck.wait.Add(len(elems))
+	go func() {
+		for _, elem := range elems {
+			fsck.log.Debug().Printf("enqueueing %v", elem)
+			fsck.queue <- elem
+		}
+	}()
+}
+
+func (fsck fsckProcess) handle(elem queueElement) {
+	defer fsck.wait.Done()
+
+	rawobj, err := storage.GetObject(fsck.st, elem.Id)
+	if err != nil {
+		fsck.problems <- FsckProblem{
+			Id:          elem.Id,
+			Ancestors:   elem.Ancestors,
+			ProblemType: FsckStorageError,
+			Err:         err,
+		}
+
+		return
+	}
+
+	obj, err := rawobj.Object()
+	if err != nil {
+		fsck.problems <- FsckProblem{
+			Id:          elem.Id,
+			Ancestors:   elem.Ancestors,
+			ProblemType: FsckDeserializationError,
+			Err:         err,
+		}
+
+		return
+	}
+
+	switch obj.Type() {
+	case objects.OTBlob:
+		fsck.handleBlob(elem, obj.(*objects.Blob))
+	case objects.OTFile:
+		fsck.handleFile(elem, obj.(*objects.File))
+	case objects.OTTree:
+		fsck.handleTree(elem, obj.(objects.Tree))
+	case objects.OTSnapshot:
+		fsck.handleSnapshot(elem, obj.(*objects.Snapshot))
+	}
+}
+
+func (fsck fsckProcess) handleBlob(elem queueElement, obj *objects.Blob) {
+	if elem.Extra == nil {
+		return
+	}
+
+	want, ok := elem.Extra.(int)
+	if !ok {
+		return
+	}
+	have := len(*obj)
+
+	if have != want {
+		fsck.problems <- FsckProblem{
+			Id:          elem.Id,
+			Ancestors:   elem.Ancestors,
+			ProblemType: FsckUnexpectedBlobSize,
+			WantSize:    want,
+			HaveSize:    have,
+		}
+	}
+}
+
+func (fsck fsckProcess) handleFile(elem queueElement, obj *objects.File) {
+	if !fsck.blobs {
+		return
+	}
+
+	enqueue := make([]queueElement, 0, len(*obj))
+
+	for _, fragment := range *obj {
+		enqueue = append(enqueue, queueElement{
+			Id: fragment.Blob,
+			Ancestors: append(elem.Ancestors, AncestorInfo{
+				Id:   elem.Id,
+				Type: objects.OTFile,
+			}),
+			Extra: int(fragment.Size),
+		})
+	}
+
+	fsck.enqueue(enqueue)
+}
+
+func (fsck fsckProcess) handleTree(elem queueElement, obj objects.Tree) {
+	ancestors := func(name string) []AncestorInfo {
+		return append(elem.Ancestors, AncestorInfo{
+			Id:   elem.Id,
+			Type: objects.OTTree,
+			Name: name,
+		})
+	}
+
+	enqueue := make([]queueElement, 0, len(obj))
+
+	for name, entry := range obj {
+		switch entry.Type() {
+		case objects.TETDir:
+			enqueue = append(enqueue, queueElement{
+				Id:        entry.(objects.TreeEntryDir).Ref,
+				Ancestors: ancestors(name),
+			})
+		case objects.TETFile:
+			enqueue = append(enqueue, queueElement{
+				Id:        entry.(objects.TreeEntryFile).Ref,
+				Ancestors: ancestors(name),
+			})
+		}
+	}
+
+	fsck.enqueue(enqueue)
+}
+
+func (fsck fsckProcess) handleSnapshot(elem queueElement, obj *objects.Snapshot) {
+	fsck.enqueue([]queueElement{
+		{Id: obj.Tree},
+	})
+}
+
+func (fsck fsckProcess) worker(i int) {
+	for elem := range fsck.queue {
+		fsck.handle(elem)
+	}
+	fsck.log.Debug().Printf("stopping worker %d", i)
+}
+
+// Fsck checks the consistency of objects in a storage
+func Fsck(
+	st storage.Storage,
+	start *objects.ObjectId,
+	blobs bool,
+	problems chan<- FsckProblem,
+	log *logging.Log,
+) error {
+	proc := fsckProcess{
+		st:       st,
+		blobs:    blobs,
+		problems: problems,
+		wait:     new(sync.WaitGroup),
+		queue:    make(chan queueElement),
+		seen:     make(map[string]struct{}),
+		seenLock: new(sync.Mutex),
+		log:      log,
+	}
+
+	enqueue := []queueElement{}
+
+	if start == nil {
+		types := []objects.ObjectType{
+			objects.OTFile,
+			objects.OTTree,
+			objects.OTSnapshot,
+		}
+		for _, t := range types {
+			ids, err := st.List(t)
+			if err != nil {
+				return err
+			}
+
+			for _, id := range ids {
+				enqueue = append(enqueue, queueElement{Id: id})
+			}
+		}
+	} else {
+		enqueue = []queueElement{
+			{Id: *start},
+		}
+	}
+
+	if len(enqueue) == 0 {
+		return nil
+	}
+
+	for i := 0; i < runtime.NumCPU(); i++ {
+		log.Debug().Printf("starting worker %d", i)
+		go proc.worker(i)
+	}
+
+	proc.enqueue(enqueue)
+
+	proc.wait.Wait()
+	close(proc.queue)
+	return nil
+}
diff --git a/backup/fsck_test.go b/backup/fsck_test.go
new file mode 100644
index 0000000..f42dab3
--- /dev/null
+++ b/backup/fsck_test.go
@@ -0,0 +1,157 @@
+package backup
+
+import (
+	"code.laria.me/petrific/logging"
+	"code.laria.me/petrific/objects"
+	"code.laria.me/petrific/storage/memory"
+	"testing"
+)
+
+func TestHealthy(t *testing.T) {
+	st := memory.NewMemoryStorage()
+	st.Set(objid_emptyfile, objects.OTFile, obj_emptyfile)
+	st.Set(objid_fooblob, objects.OTBlob, obj_fooblob)
+	st.Set(objid_foofile, objects.OTFile, obj_foofile)
+	st.Set(objid_emptytree, objects.OTTree, obj_emptytree)
+	st.Set(objid_subtree, objects.OTTree, obj_subtree)
+	st.Set(objid_testtree, objects.OTTree, obj_testtree)
+
+	problems := make(chan FsckProblem)
+	var err error
+	go func() {
+		err = Fsck(st, nil, true, problems, logging.NewNopLog())
+		close(problems)
+	}()
+
+	for p := range problems {
+		t.Errorf("Unexpected problem: %s", p)
+	}
+
+	if err != nil {
+		t.Errorf("Unexpected error: %s", err)
+	}
+}
+
+var (
+	// snapshot with missing tree object
+	obj_corrupt_snapshot_1 = []byte("" +
+		"snapshot 162\n" +
+		"== BEGIN SNAPSHOT ==\n" +
+		"archive foo\n" +
+		"date 2018-01-06T22:42:00+01:00\n" +
+		"tree sha3-256:f000000000000000000000000000000000000000000000000000000000000000\n" +
+		"== END SNAPSHOT ==\n")
+	objid_corrupt_snapshot_1 = objects.MustParseObjectId("sha3-256:e33ad8ed4ef309099d593d249b36f2a5377dd26aeb18479695763fec514f519e")
+
+	missing_objid_corrupt_snapshot_1 = objects.MustParseObjectId("sha3-256:f000000000000000000000000000000000000000000000000000000000000000")
+
+	obj_corrupt_snapshot_2 = []byte("" +
+		"snapshot 162\n" +
+		"== BEGIN SNAPSHOT ==\n" +
+		"archive foo\n" +
+		"date 2018-01-06T22:45:00+01:00\n" +
+		"tree sha3-256:086f877d9e0760929c0a528ca3a01a7a19c03176a132cc6f4894c69b5943d543\n" +
+		"== END SNAPSHOT ==\n")
+	objid_corrupt_snapshot_2 = objects.MustParseObjectId("sha3-256:d5da78d96bb1bc7bff1f7cee509dba084b54ff4b9af0ed23a6a14437765ac81f")
+
+	obj_corrupt_tree = []byte("tree 431\n" +
+		"name=invalidhash&ref=sha3-256:8888888888888888888888888888888888888888888888888888888888888888&type=file\n" +
+		"name=invalidserialization&ref=sha3-256:7c3c1c331531a80d0e37a6066a6a4e4881eb897f1d76aeffd86a3bd96f0c054f&type=file\n" +
+		"name=lengthmismatch&ref=sha3-256:caea41322f4e02d68a15abe3a867c9ab507674a1f70abc892a162c5b3a742349&type=file\n" +
+		"name=missingobj&ref=sha3-256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff&type=file\n")
+	objid_corrupt_tree = objects.MustParseObjectId("sha3-256:086f877d9e0760929c0a528ca3a01a7a19c03176a132cc6f4894c69b5943d543")
+
+	invalid_objid = objects.MustParseObjectId("sha3-256:8888888888888888888888888888888888888888888888888888888888888888")
+
+	missing_objid_corrupt_tree = objects.MustParseObjectId("sha3-256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
+
+	obj_corrupt_file_invalidhash   = obj_emptyfile
+	objid_corrupt_file_invalidhash = objects.MustParseObjectId("sha3-256:8888888888888888888888888888888888888888888888888888888888888888")
+
+	obj_corrupt_file_invalidserialization   = []byte("file 9\nsize=123\n")
+	objid_corrupt_file_invalidserialization = objects.MustParseObjectId("sha3-256:7c3c1c331531a80d0e37a6066a6a4e4881eb897f1d76aeffd86a3bd96f0c054f")
+
+	obj_corrupt_blob_lengthmismatch   = []byte("blob 2\nx\n")
+	objid_corrupt_blob_lengthmismatch = objects.MustParseObjectId("sha3-256:c9f04ca8fb21c7abb6221060b4e2a332686d0f6be872bdeb85cdc5fe3f2743ca")
+
+	obj_corrupt_file_lengthmismatch = []byte("" +
+		"file 88\n" +
+		"blob=sha3-256:c9f04ca8fb21c7abb6221060b4e2a332686d0f6be872bdeb85cdc5fe3f2743ca&size=100\n")
+	objid_corrupt_file_lengthmismatch = objects.MustParseObjectId("sha3-256:caea41322f4e02d68a15abe3a867c9ab507674a1f70abc892a162c5b3a742349")
+)
+
+func TestCorrupted(t *testing.T) {
+	st := memory.NewMemoryStorage()
+	st.Set(objid_corrupt_snapshot_1, objects.OTSnapshot, obj_corrupt_snapshot_1)
+	st.Set(objid_corrupt_snapshot_2, objects.OTSnapshot, obj_corrupt_snapshot_2)
+	st.Set(objid_corrupt_tree, objects.OTTree, obj_corrupt_tree)
+	st.Set(objid_corrupt_file_invalidhash, objects.OTFile, obj_corrupt_file_invalidhash)
+	st.Set(objid_corrupt_file_invalidserialization, objects.OTFile, obj_corrupt_file_invalidserialization)
+	st.Set(objid_corrupt_blob_lengthmismatch, objects.OTBlob, obj_corrupt_blob_lengthmismatch)
+	st.Set(objid_corrupt_file_lengthmismatch, objects.OTFile, obj_corrupt_file_lengthmismatch)
+
+	problems := make(chan FsckProblem)
+	var err error
+	go func() {
+		err = Fsck(st, nil, true, problems, logging.NewNopLog())
+		close(problems)
+	}()
+
+	var seen_snapshot_1_problem,
+		seen_invalidhash,
+		seen_invalidserialization,
+		seen_lengthmismatch,
+		seen_missing_file bool
+
+	for p := range problems {
+		if p.Id.Equals(missing_objid_corrupt_snapshot_1) && p.ProblemType == FsckStorageError {
+			seen_snapshot_1_problem = true
+			continue
+		}
+
+		if p.Id.Equals(invalid_objid) && p.ProblemType == FsckStorageError {
+			seen_invalidhash = true
+			continue
+		}
+
+		if p.Id.Equals(objid_corrupt_file_invalidserialization) && p.ProblemType == FsckDeserializationError {
+			seen_invalidserialization = true
+			continue
+		}
+
+		if p.Id.Equals(objid_corrupt_blob_lengthmismatch) &&
+			p.ProblemType == FsckUnexpectedBlobSize &&
+			p.HaveSize == 2 && p.WantSize == 100 {
+
+			seen_lengthmismatch = true
+			continue
+		}
+
+		if p.Id.Equals(missing_objid_corrupt_tree) && p.ProblemType == FsckStorageError {
+			seen_missing_file = true
+			continue
+		}
+
+		t.Errorf("Unexpected problem: %s", p)
+	}
+
+	if err != nil {
+		t.Errorf("Unexpected error: %s", err)
+	}
+
+	if !seen_snapshot_1_problem {
+		t.Error("missing problem snapshot_1_problem")
+	}
+	if !seen_invalidhash {
+		t.Error("missing problem invalidhash")
+	}
+	if !seen_invalidserialization {
+		t.Error("missing problem invalidserialization")
+	}
+	if !seen_lengthmismatch {
+		t.Error("missing problem lengthmismatch")
+	}
+	if !seen_missing_file {
+		t.Error("missing problem missing_file")
+	}
+}
diff --git a/fsck.go b/fsck.go
new file mode 100644
index 0000000..b3ef48d
--- /dev/null
+++ b/fsck.go
@@ -0,0 +1,61 @@
+package main
+
+import (
+	"code.laria.me/petrific/backup"
+	"code.laria.me/petrific/objects"
+	"flag"
+	"os"
+)
+
+func Fsck(env *Env, args []string) int {
+	flags := flag.NewFlagSet(os.Args[0]+" fsck", flag.ContinueOnError)
+	full := flags.Bool("full", false, "also check blobs")
+
+	flags.Usage = subcmdUsage("fsck", "[flags] [object-id]", flags)
+	errout := subcmdErrout(env.Log, "fsck")
+
+	if err := flags.Parse(args); err != nil {
+		errout(err)
+		return 2
+	}
+
+	var fsck_id *objects.ObjectId = nil
+
+	if len(flags.Args()) > 1 {
+		id, err := objects.ParseObjectId(flags.Args()[0])
+		if err != nil {
+			env.Log.Error().Printf("Could not parse object id: %s", err)
+			return 1
+		}
+
+		fsck_id = &id
+	}
+
+	env.Log.Debug().Printf("id: %v", fsck_id)
+
+	problems := make(chan backup.FsckProblem)
+	var err error
+	go func() {
+		err = backup.Fsck(env.Store, fsck_id, *full, problems, env.Log)
+		close(problems)
+	}()
+
+	problems_found := false
+	for p := range problems {
+		env.Log.Warn().Print(p)
+		problems_found = true
+	}
+
+	if problems_found {
+		env.Log.Error().Print("Problems found. See warnings in the log")
+	}
+
+	if err != nil {
+		env.Log.Error().Print(err)
+	}
+
+	if err != nil || problems_found {
+		return 1
+	}
+	return 0
+}
diff --git a/main.go b/main.go
index c567403..94a3415 100644
--- a/main.go
+++ b/main.go
@@ -16,6 +16,7 @@ var subcmds = map[string]subcmd{
 	"create-snapshot":  CreateSnapshot,
 	"list-snapshots":   ListSnapshots,
 	"restore-snapshot": RestoreSnapshot,
+	"fsck":             Fsck,
 }
 
 func subcmdUsage(name string, usage string, flags *flag.FlagSet) func() {
-- 
cgit v1.2.3-70-g09d2