aboutsummaryrefslogtreecommitdiff
path: root/storage/cloud/cloudstorage.go
diff options
context:
space:
mode:
Diffstat (limited to 'storage/cloud/cloudstorage.go')
-rw-r--r--storage/cloud/cloudstorage.go183
1 files changed, 183 insertions, 0 deletions
diff --git a/storage/cloud/cloudstorage.go b/storage/cloud/cloudstorage.go
new file mode 100644
index 0000000..9ae8ee8
--- /dev/null
+++ b/storage/cloud/cloudstorage.go
@@ -0,0 +1,183 @@
+// Package cloud provides utilities to implement a petrific storage using a cloud-based object-storage (S3/Openstack Swift style)
+
+package cloud
+
+import (
+ "bytes"
+ "code.laria.me/petrific/config"
+ "code.laria.me/petrific/gpg"
+ "code.laria.me/petrific/objects"
+ "code.laria.me/petrific/storage"
+ "errors"
+ "fmt"
+ "math/rand"
+)
+
+type CloudStorage interface {
+ Get(key string) ([]byte, error)
+ Has(key string) (bool, error)
+ Put(key string, content []byte) error
+ Delete(key string) error
+ List(prefix string) ([]string, error)
+
+ Close() error
+}
+
+var (
+ NotFoundErr = errors.New("Object not found") // Cloud object could not be found
+)
+
+// Crypter provides de-/encrypting facilities for CloudBasedObjectStorage
+type Crypter interface {
+ Encrypt([]byte) ([]byte, error)
+ Decrypt([]byte) ([]byte, error)
+}
+
+// NopCrypter implements Crypter by not de/-encrypting at all
+type NopCrypter struct{}
+
+func (NopCrypter) Encrypt(in []byte) ([]byte, error) { return in, nil }
+func (NopCrypter) Decrypt(in []byte) ([]byte, error) { return in, nil }
+
+type CloudBasedObjectStorage struct {
+ CS CloudStorage
+ Prefix string
+ Crypter Crypter
+
+ index storage.Index
+}
+
+func (cbos CloudBasedObjectStorage) objidToKey(id objects.ObjectId) string {
+ return cbos.Prefix + "obj/" + id.String()
+}
+
+func (cbos CloudBasedObjectStorage) readIndex(name string) (storage.Index, error) {
+ index := storage.NewIndex()
+
+ b, err := cbos.CS.Get(name)
+ if err != nil {
+ return index, err
+ }
+
+ err = index.Load(bytes.NewReader(b))
+ return index, err
+}
+
+func (cbos *CloudBasedObjectStorage) Init() error {
+ cbos.index = storage.NewIndex()
+
+ // Load and combine all indexes, keep only the one with the "largest" name (see also Close())
+ index_names, err := cbos.CS.List(cbos.Prefix + "index/")
+ if err != nil {
+ return err
+ }
+
+ max_index := ""
+ for _, index_name := range index_names {
+ index, err := cbos.readIndex(index_name)
+ if err != nil {
+ return err
+ }
+
+ cbos.index.Combine(index)
+ }
+
+ for _, index_name := range index_names {
+ if index_name != max_index {
+ if err := cbos.CS.Delete(index_name); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func (cbos CloudBasedObjectStorage) Get(id objects.ObjectId) ([]byte, error) {
+ b, err := cbos.CS.Get(cbos.objidToKey(id))
+ if err != nil {
+ return []byte{}, err
+ }
+
+ return cbos.Crypter.Decrypt(b)
+}
+
+func (cbos CloudBasedObjectStorage) Has(id objects.ObjectId) (bool, error) {
+ return cbos.CS.Has(cbos.objidToKey(id))
+}
+
+func (cbos CloudBasedObjectStorage) Set(id objects.ObjectId, typ objects.ObjectType, raw []byte) error {
+ b, err := cbos.Crypter.Encrypt(raw)
+ if err != nil {
+ return err
+ }
+
+ if err := cbos.CS.Put(cbos.objidToKey(id), b); err != nil {
+ return err
+ }
+
+ // could be used to repopulate the index (not implemented yet)
+ if err := cbos.CS.Put(cbos.Prefix+"typeof/"+id.String(), []byte(typ)); err != nil {
+ return err
+ }
+
+ cbos.index.Set(id, typ)
+
+ return nil
+}
+
+func (cbos CloudBasedObjectStorage) List(typ objects.ObjectType) ([]objects.ObjectId, error) {
+ return cbos.index.List(typ), nil
+}
+
+func (cbos CloudBasedObjectStorage) Close() (outerr error) {
+ defer func() {
+ err := cbos.CS.Close()
+ if outerr == nil {
+ outerr = err
+ }
+ }()
+
+ // We need to adress the problem of parallel index creation here.
+ // We handle this by adding a random hex number to the index name.
+ // When loading the index, all "index/*" objects will be read and combined
+ // and all but the one with the largest number will be deleted.
+
+ buf := new(bytes.Buffer)
+ if outerr = cbos.index.Save(buf); outerr != nil {
+ return outerr
+ }
+
+ index_name := fmt.Sprintf("%sindex/%016x", cbos.Prefix, rand.Int63())
+ return cbos.CS.Put(index_name, buf.Bytes())
+}
+
+type cloudObjectStorageCreator func(conf config.Config, name string) (CloudStorage, error)
+
+func cloudStorageCreator(cloudCreator cloudObjectStorageCreator) storage.CreateStorageFromConfig {
+ return func(conf config.Config, name string) (storage.Storage, error) {
+ var cbos CloudBasedObjectStorage
+
+ storageconf := conf.Storage[name]
+
+ storageconf.Get("prefix", &cbos.Prefix)
+
+ encrypt_for := ""
+ storageconf.Get("encrypt_for", &encrypt_for)
+
+ if encrypt_for != "" {
+ cbos.Crypter = gpg.Crypter{
+ gpg.Encrypter{Key: encrypt_for},
+ gpg.Decrypter{},
+ }
+ }
+
+ var err error
+ if cbos.CS, err = cloudCreator(conf, name); err != nil {
+ return nil, err
+ }
+
+ err = cbos.Init()
+ return cbos, err
+ }
+}