diff options
author | Laria Carolin Chabowski <laria@laria.me> | 2017-09-26 21:33:50 +0200 |
---|---|---|
committer | Laria Carolin Chabowski <laria@laria.me> | 2017-10-03 15:01:38 +0200 |
commit | b48f1f2c372d1fa7bca155c449c590d852a03511 (patch) | |
tree | 29d468067d6381fdb3a810c9f05dde8daccc06df /storage | |
parent | b2742dc28d1ff9001cd784455bbdf9cf29539c30 (diff) | |
download | petrific-b48f1f2c372d1fa7bca155c449c590d852a03511.tar.gz petrific-b48f1f2c372d1fa7bca155c449c590d852a03511.tar.bz2 petrific-b48f1f2c372d1fa7bca155c449c590d852a03511.zip |
Add cloudstorage support
For now Openstack Swift is supported. But it should be pretty easy to
implement other Swift / S3 like object storages.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/cloud/cloudstorage.go | 183 | ||||
-rw-r--r-- | storage/cloud/swift.go | 118 | ||||
-rw-r--r-- | storage/index.go | 8 | ||||
-rw-r--r-- | storage/registry/registry.go | 13 | ||||
-rw-r--r-- | storage/storage.go | 5 |
5 files changed, 322 insertions, 5 deletions
diff --git a/storage/cloud/cloudstorage.go b/storage/cloud/cloudstorage.go new file mode 100644 index 0000000..9ae8ee8 --- /dev/null +++ b/storage/cloud/cloudstorage.go @@ -0,0 +1,183 @@ +// Package cloud provides utilities to implement a petrific storage using a cloud-based object-storage (S3/Openstack Swift style) + +package cloud + +import ( + "bytes" + "code.laria.me/petrific/config" + "code.laria.me/petrific/gpg" + "code.laria.me/petrific/objects" + "code.laria.me/petrific/storage" + "errors" + "fmt" + "math/rand" +) + +type CloudStorage interface { + Get(key string) ([]byte, error) + Has(key string) (bool, error) + Put(key string, content []byte) error + Delete(key string) error + List(prefix string) ([]string, error) + + Close() error +} + +var ( + NotFoundErr = errors.New("Object not found") // Cloud object could not be found +) + +// Crypter provides de-/encrypting facilities for CloudBasedObjectStorage +type Crypter interface { + Encrypt([]byte) ([]byte, error) + Decrypt([]byte) ([]byte, error) +} + +// NopCrypter implements Crypter by not de/-encrypting at all +type NopCrypter struct{} + +func (NopCrypter) Encrypt(in []byte) ([]byte, error) { return in, nil } +func (NopCrypter) Decrypt(in []byte) ([]byte, error) { return in, nil } + +type CloudBasedObjectStorage struct { + CS CloudStorage + Prefix string + Crypter Crypter + + index storage.Index +} + +func (cbos CloudBasedObjectStorage) objidToKey(id objects.ObjectId) string { + return cbos.Prefix + "obj/" + id.String() +} + +func (cbos CloudBasedObjectStorage) readIndex(name string) (storage.Index, error) { + index := storage.NewIndex() + + b, err := cbos.CS.Get(name) + if err != nil { + return index, err + } + + err = index.Load(bytes.NewReader(b)) + return index, err +} + +func (cbos *CloudBasedObjectStorage) Init() error { + cbos.index = storage.NewIndex() + + // Load and combine all indexes, keep only the one with the "largest" name (see also Close()) + index_names, err := cbos.CS.List(cbos.Prefix + "index/") + if err != nil { + return err + } + + max_index := "" + for _, index_name := range index_names { + index, err := cbos.readIndex(index_name) + if err != nil { + return err + } + + cbos.index.Combine(index) + } + + for _, index_name := range index_names { + if index_name != max_index { + if err := cbos.CS.Delete(index_name); err != nil { + return err + } + } + } + + return nil +} + +func (cbos CloudBasedObjectStorage) Get(id objects.ObjectId) ([]byte, error) { + b, err := cbos.CS.Get(cbos.objidToKey(id)) + if err != nil { + return []byte{}, err + } + + return cbos.Crypter.Decrypt(b) +} + +func (cbos CloudBasedObjectStorage) Has(id objects.ObjectId) (bool, error) { + return cbos.CS.Has(cbos.objidToKey(id)) +} + +func (cbos CloudBasedObjectStorage) Set(id objects.ObjectId, typ objects.ObjectType, raw []byte) error { + b, err := cbos.Crypter.Encrypt(raw) + if err != nil { + return err + } + + if err := cbos.CS.Put(cbos.objidToKey(id), b); err != nil { + return err + } + + // could be used to repopulate the index (not implemented yet) + if err := cbos.CS.Put(cbos.Prefix+"typeof/"+id.String(), []byte(typ)); err != nil { + return err + } + + cbos.index.Set(id, typ) + + return nil +} + +func (cbos CloudBasedObjectStorage) List(typ objects.ObjectType) ([]objects.ObjectId, error) { + return cbos.index.List(typ), nil +} + +func (cbos CloudBasedObjectStorage) Close() (outerr error) { + defer func() { + err := cbos.CS.Close() + if outerr == nil { + outerr = err + } + }() + + // We need to adress the problem of parallel index creation here. + // We handle this by adding a random hex number to the index name. + // When loading the index, all "index/*" objects will be read and combined + // and all but the one with the largest number will be deleted. + + buf := new(bytes.Buffer) + if outerr = cbos.index.Save(buf); outerr != nil { + return outerr + } + + index_name := fmt.Sprintf("%sindex/%016x", cbos.Prefix, rand.Int63()) + return cbos.CS.Put(index_name, buf.Bytes()) +} + +type cloudObjectStorageCreator func(conf config.Config, name string) (CloudStorage, error) + +func cloudStorageCreator(cloudCreator cloudObjectStorageCreator) storage.CreateStorageFromConfig { + return func(conf config.Config, name string) (storage.Storage, error) { + var cbos CloudBasedObjectStorage + + storageconf := conf.Storage[name] + + storageconf.Get("prefix", &cbos.Prefix) + + encrypt_for := "" + storageconf.Get("encrypt_for", &encrypt_for) + + if encrypt_for != "" { + cbos.Crypter = gpg.Crypter{ + gpg.Encrypter{Key: encrypt_for}, + gpg.Decrypter{}, + } + } + + var err error + if cbos.CS, err = cloudCreator(conf, name); err != nil { + return nil, err + } + + err = cbos.Init() + return cbos, err + } +} diff --git a/storage/cloud/swift.go b/storage/cloud/swift.go new file mode 100644 index 0000000..41a13c6 --- /dev/null +++ b/storage/cloud/swift.go @@ -0,0 +1,118 @@ +package cloud + +import ( + "code.laria.me/petrific/config" + "code.laria.me/petrific/storage" + "fmt" + "github.com/ncw/swift" + "time" +) + +type SwiftCloudStorage struct { + con *swift.Connection + container string +} + +type SwiftStorageConfigMandatoryError struct { + Key string + Err error +} + +func (e SwiftStorageConfigMandatoryError) Error() string { + return fmt.Sprintf("Could not get mandatory key %s for swift storage: %s", e.Key, e.Err.Error()) +} + +func SwiftStorageCreator() storage.CreateStorageFromConfig { + return cloudStorageCreator(func(conf config.Config, name string) (CloudStorage, error) { + storage_conf := conf.Storage[name] + storage := SwiftCloudStorage{} + storage.con = new(swift.Connection) + + // Mandatory options + if err := storage_conf.Get("container", &storage.container); err != nil { + return nil, SwiftStorageConfigMandatoryError{"container", err} + } + if err := storage_conf.Get("user_name", &storage.con.UserName); err != nil { + return nil, SwiftStorageConfigMandatoryError{"user_name", err} + } + if err := storage_conf.Get("api_key", &storage.con.ApiKey); err != nil { + return nil, SwiftStorageConfigMandatoryError{"api_key", err} + } + if err := storage_conf.Get("auth_url", &storage.con.AuthUrl); err != nil { + return nil, SwiftStorageConfigMandatoryError{"auth_url", err} + } + + // Optional... options + + storage_conf.Get("domain", &storage.con.Domain) + storage_conf.Get("domain_id", &storage.con.DomainId) + storage_conf.Get("user_id", &storage.con.UserId) + storage_conf.Get("retries", &storage.con.Retries) + storage_conf.Get("region", &storage.con.Region) + storage_conf.Get("auth_version", &storage.con.AuthVersion) + storage_conf.Get("tenant", &storage.con.Tenant) + storage_conf.Get("tenant_id", &storage.con.TenantId) + storage_conf.Get("tenant_domain", &storage.con.TenantDomain) + storage_conf.Get("tenant_domain_id", &storage.con.TenantDomainId) + storage_conf.Get("trust_id", &storage.con.TrustId) + + var connect_timeout_str string + if storage_conf.Get("connect_timeout", &connect_timeout_str) == nil { + d, err := time.ParseDuration(connect_timeout_str) + if err != nil { + return nil, err + } + storage.con.ConnectTimeout = d + } + + var timeout_str string + if storage_conf.Get("timeout", &timeout_str) == nil { + d, err := time.ParseDuration(timeout_str) + if err != nil { + return nil, err + } + storage.con.Timeout = d + } + + if err := storage.con.Authenticate(); err != nil { + return nil, err + } + + return storage, nil + }) +} + +func (scs SwiftCloudStorage) Get(key string) ([]byte, error) { + return scs.con.ObjectGetBytes(scs.container, key) +} + +func (scs SwiftCloudStorage) Has(key string) (bool, error) { + switch _, _, err := scs.con.Object(scs.container, key); err { + case nil: + return true, nil + case swift.ObjectNotFound: + return false, nil + default: + return false, err + } +} + +func (scs SwiftCloudStorage) Put(key string, content []byte) error { + err := scs.con.ObjectPutBytes(scs.container, key, content, "application/octet-stream") + + return err +} + +func (scs SwiftCloudStorage) Delete(key string) error { + return scs.con.ObjectDelete(scs.container, key) +} + +func (scs SwiftCloudStorage) List(prefix string) ([]string, error) { + return scs.con.ObjectNamesAll(scs.container, &swift.ObjectsOpts{ + Prefix: prefix, + }) +} + +func (scs SwiftCloudStorage) Close() error { + return nil +} diff --git a/storage/index.go b/storage/index.go index f34abab..1d01b93 100644 --- a/storage/index.go +++ b/storage/index.go @@ -69,3 +69,11 @@ func (idx Index) Load(r io.Reader) error { } return scan.Err() } + +func (a Index) Combine(b Index) { + for t, objs := range b { + for id := range objs { + a[t][id] = struct{}{} + } + } +} diff --git a/storage/registry/registry.go b/storage/registry/registry.go new file mode 100644 index 0000000..dec7c32 --- /dev/null +++ b/storage/registry/registry.go @@ -0,0 +1,13 @@ +package registry + +import ( + "code.laria.me/petrific/storage" + "code.laria.me/petrific/storage/cloud" +) + +// List af all available storage types +var StorageTypes = map[string]storage.CreateStorageFromConfig{ + "local": storage.LocalStorageFromConfig, + "memory": storage.MemoryStorageFromConfig, + "openstack-swift": cloud.SwiftStorageCreator(), +} diff --git a/storage/storage.go b/storage/storage.go index 7d09a70..176e75d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -25,11 +25,6 @@ type Storage interface { type CreateStorageFromConfig func(conf config.Config, name string) (Storage, error) -var StorageTypes = map[string]CreateStorageFromConfig{ - "local": LocalStorageFromConfig, - "memory": MemoryStorageFromConfig, -} - func SetObject(s Storage, o objects.RawObject) (id objects.ObjectId, err error) { buf := new(bytes.Buffer) |