aboutsummaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorLaria Carolin Chabowski <laria@laria.me>2017-09-26 21:33:50 +0200
committerLaria Carolin Chabowski <laria@laria.me>2017-10-03 15:01:38 +0200
commitb48f1f2c372d1fa7bca155c449c590d852a03511 (patch)
tree29d468067d6381fdb3a810c9f05dde8daccc06df /storage
parentb2742dc28d1ff9001cd784455bbdf9cf29539c30 (diff)
downloadpetrific-b48f1f2c372d1fa7bca155c449c590d852a03511.tar.gz
petrific-b48f1f2c372d1fa7bca155c449c590d852a03511.tar.bz2
petrific-b48f1f2c372d1fa7bca155c449c590d852a03511.zip
Add cloudstorage support
For now Openstack Swift is supported. But it should be pretty easy to implement other Swift / S3 like object storages.
Diffstat (limited to 'storage')
-rw-r--r--storage/cloud/cloudstorage.go183
-rw-r--r--storage/cloud/swift.go118
-rw-r--r--storage/index.go8
-rw-r--r--storage/registry/registry.go13
-rw-r--r--storage/storage.go5
5 files changed, 322 insertions, 5 deletions
diff --git a/storage/cloud/cloudstorage.go b/storage/cloud/cloudstorage.go
new file mode 100644
index 0000000..9ae8ee8
--- /dev/null
+++ b/storage/cloud/cloudstorage.go
@@ -0,0 +1,183 @@
+// Package cloud provides utilities to implement a petrific storage using a cloud-based object-storage (S3/Openstack Swift style)
+
+package cloud
+
+import (
+ "bytes"
+ "code.laria.me/petrific/config"
+ "code.laria.me/petrific/gpg"
+ "code.laria.me/petrific/objects"
+ "code.laria.me/petrific/storage"
+ "errors"
+ "fmt"
+ "math/rand"
+)
+
+type CloudStorage interface {
+ Get(key string) ([]byte, error)
+ Has(key string) (bool, error)
+ Put(key string, content []byte) error
+ Delete(key string) error
+ List(prefix string) ([]string, error)
+
+ Close() error
+}
+
+var (
+ NotFoundErr = errors.New("Object not found") // Cloud object could not be found
+)
+
+// Crypter provides de-/encrypting facilities for CloudBasedObjectStorage
+type Crypter interface {
+ Encrypt([]byte) ([]byte, error)
+ Decrypt([]byte) ([]byte, error)
+}
+
+// NopCrypter implements Crypter by not de/-encrypting at all
+type NopCrypter struct{}
+
+func (NopCrypter) Encrypt(in []byte) ([]byte, error) { return in, nil }
+func (NopCrypter) Decrypt(in []byte) ([]byte, error) { return in, nil }
+
+type CloudBasedObjectStorage struct {
+ CS CloudStorage
+ Prefix string
+ Crypter Crypter
+
+ index storage.Index
+}
+
+func (cbos CloudBasedObjectStorage) objidToKey(id objects.ObjectId) string {
+ return cbos.Prefix + "obj/" + id.String()
+}
+
+func (cbos CloudBasedObjectStorage) readIndex(name string) (storage.Index, error) {
+ index := storage.NewIndex()
+
+ b, err := cbos.CS.Get(name)
+ if err != nil {
+ return index, err
+ }
+
+ err = index.Load(bytes.NewReader(b))
+ return index, err
+}
+
+func (cbos *CloudBasedObjectStorage) Init() error {
+ cbos.index = storage.NewIndex()
+
+ // Load and combine all indexes, keep only the one with the "largest" name (see also Close())
+ index_names, err := cbos.CS.List(cbos.Prefix + "index/")
+ if err != nil {
+ return err
+ }
+
+ max_index := ""
+ for _, index_name := range index_names {
+ index, err := cbos.readIndex(index_name)
+ if err != nil {
+ return err
+ }
+
+ cbos.index.Combine(index)
+ }
+
+ for _, index_name := range index_names {
+ if index_name != max_index {
+ if err := cbos.CS.Delete(index_name); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func (cbos CloudBasedObjectStorage) Get(id objects.ObjectId) ([]byte, error) {
+ b, err := cbos.CS.Get(cbos.objidToKey(id))
+ if err != nil {
+ return []byte{}, err
+ }
+
+ return cbos.Crypter.Decrypt(b)
+}
+
+func (cbos CloudBasedObjectStorage) Has(id objects.ObjectId) (bool, error) {
+ return cbos.CS.Has(cbos.objidToKey(id))
+}
+
+func (cbos CloudBasedObjectStorage) Set(id objects.ObjectId, typ objects.ObjectType, raw []byte) error {
+ b, err := cbos.Crypter.Encrypt(raw)
+ if err != nil {
+ return err
+ }
+
+ if err := cbos.CS.Put(cbos.objidToKey(id), b); err != nil {
+ return err
+ }
+
+ // could be used to repopulate the index (not implemented yet)
+ if err := cbos.CS.Put(cbos.Prefix+"typeof/"+id.String(), []byte(typ)); err != nil {
+ return err
+ }
+
+ cbos.index.Set(id, typ)
+
+ return nil
+}
+
+func (cbos CloudBasedObjectStorage) List(typ objects.ObjectType) ([]objects.ObjectId, error) {
+ return cbos.index.List(typ), nil
+}
+
+func (cbos CloudBasedObjectStorage) Close() (outerr error) {
+ defer func() {
+ err := cbos.CS.Close()
+ if outerr == nil {
+ outerr = err
+ }
+ }()
+
+ // We need to adress the problem of parallel index creation here.
+ // We handle this by adding a random hex number to the index name.
+ // When loading the index, all "index/*" objects will be read and combined
+ // and all but the one with the largest number will be deleted.
+
+ buf := new(bytes.Buffer)
+ if outerr = cbos.index.Save(buf); outerr != nil {
+ return outerr
+ }
+
+ index_name := fmt.Sprintf("%sindex/%016x", cbos.Prefix, rand.Int63())
+ return cbos.CS.Put(index_name, buf.Bytes())
+}
+
+type cloudObjectStorageCreator func(conf config.Config, name string) (CloudStorage, error)
+
+func cloudStorageCreator(cloudCreator cloudObjectStorageCreator) storage.CreateStorageFromConfig {
+ return func(conf config.Config, name string) (storage.Storage, error) {
+ var cbos CloudBasedObjectStorage
+
+ storageconf := conf.Storage[name]
+
+ storageconf.Get("prefix", &cbos.Prefix)
+
+ encrypt_for := ""
+ storageconf.Get("encrypt_for", &encrypt_for)
+
+ if encrypt_for != "" {
+ cbos.Crypter = gpg.Crypter{
+ gpg.Encrypter{Key: encrypt_for},
+ gpg.Decrypter{},
+ }
+ }
+
+ var err error
+ if cbos.CS, err = cloudCreator(conf, name); err != nil {
+ return nil, err
+ }
+
+ err = cbos.Init()
+ return cbos, err
+ }
+}
diff --git a/storage/cloud/swift.go b/storage/cloud/swift.go
new file mode 100644
index 0000000..41a13c6
--- /dev/null
+++ b/storage/cloud/swift.go
@@ -0,0 +1,118 @@
+package cloud
+
+import (
+ "code.laria.me/petrific/config"
+ "code.laria.me/petrific/storage"
+ "fmt"
+ "github.com/ncw/swift"
+ "time"
+)
+
+type SwiftCloudStorage struct {
+ con *swift.Connection
+ container string
+}
+
+type SwiftStorageConfigMandatoryError struct {
+ Key string
+ Err error
+}
+
+func (e SwiftStorageConfigMandatoryError) Error() string {
+ return fmt.Sprintf("Could not get mandatory key %s for swift storage: %s", e.Key, e.Err.Error())
+}
+
+func SwiftStorageCreator() storage.CreateStorageFromConfig {
+ return cloudStorageCreator(func(conf config.Config, name string) (CloudStorage, error) {
+ storage_conf := conf.Storage[name]
+ storage := SwiftCloudStorage{}
+ storage.con = new(swift.Connection)
+
+ // Mandatory options
+ if err := storage_conf.Get("container", &storage.container); err != nil {
+ return nil, SwiftStorageConfigMandatoryError{"container", err}
+ }
+ if err := storage_conf.Get("user_name", &storage.con.UserName); err != nil {
+ return nil, SwiftStorageConfigMandatoryError{"user_name", err}
+ }
+ if err := storage_conf.Get("api_key", &storage.con.ApiKey); err != nil {
+ return nil, SwiftStorageConfigMandatoryError{"api_key", err}
+ }
+ if err := storage_conf.Get("auth_url", &storage.con.AuthUrl); err != nil {
+ return nil, SwiftStorageConfigMandatoryError{"auth_url", err}
+ }
+
+ // Optional... options
+
+ storage_conf.Get("domain", &storage.con.Domain)
+ storage_conf.Get("domain_id", &storage.con.DomainId)
+ storage_conf.Get("user_id", &storage.con.UserId)
+ storage_conf.Get("retries", &storage.con.Retries)
+ storage_conf.Get("region", &storage.con.Region)
+ storage_conf.Get("auth_version", &storage.con.AuthVersion)
+ storage_conf.Get("tenant", &storage.con.Tenant)
+ storage_conf.Get("tenant_id", &storage.con.TenantId)
+ storage_conf.Get("tenant_domain", &storage.con.TenantDomain)
+ storage_conf.Get("tenant_domain_id", &storage.con.TenantDomainId)
+ storage_conf.Get("trust_id", &storage.con.TrustId)
+
+ var connect_timeout_str string
+ if storage_conf.Get("connect_timeout", &connect_timeout_str) == nil {
+ d, err := time.ParseDuration(connect_timeout_str)
+ if err != nil {
+ return nil, err
+ }
+ storage.con.ConnectTimeout = d
+ }
+
+ var timeout_str string
+ if storage_conf.Get("timeout", &timeout_str) == nil {
+ d, err := time.ParseDuration(timeout_str)
+ if err != nil {
+ return nil, err
+ }
+ storage.con.Timeout = d
+ }
+
+ if err := storage.con.Authenticate(); err != nil {
+ return nil, err
+ }
+
+ return storage, nil
+ })
+}
+
+func (scs SwiftCloudStorage) Get(key string) ([]byte, error) {
+ return scs.con.ObjectGetBytes(scs.container, key)
+}
+
+func (scs SwiftCloudStorage) Has(key string) (bool, error) {
+ switch _, _, err := scs.con.Object(scs.container, key); err {
+ case nil:
+ return true, nil
+ case swift.ObjectNotFound:
+ return false, nil
+ default:
+ return false, err
+ }
+}
+
+func (scs SwiftCloudStorage) Put(key string, content []byte) error {
+ err := scs.con.ObjectPutBytes(scs.container, key, content, "application/octet-stream")
+
+ return err
+}
+
+func (scs SwiftCloudStorage) Delete(key string) error {
+ return scs.con.ObjectDelete(scs.container, key)
+}
+
+func (scs SwiftCloudStorage) List(prefix string) ([]string, error) {
+ return scs.con.ObjectNamesAll(scs.container, &swift.ObjectsOpts{
+ Prefix: prefix,
+ })
+}
+
+func (scs SwiftCloudStorage) Close() error {
+ return nil
+}
diff --git a/storage/index.go b/storage/index.go
index f34abab..1d01b93 100644
--- a/storage/index.go
+++ b/storage/index.go
@@ -69,3 +69,11 @@ func (idx Index) Load(r io.Reader) error {
}
return scan.Err()
}
+
+func (a Index) Combine(b Index) {
+ for t, objs := range b {
+ for id := range objs {
+ a[t][id] = struct{}{}
+ }
+ }
+}
diff --git a/storage/registry/registry.go b/storage/registry/registry.go
new file mode 100644
index 0000000..dec7c32
--- /dev/null
+++ b/storage/registry/registry.go
@@ -0,0 +1,13 @@
+package registry
+
+import (
+ "code.laria.me/petrific/storage"
+ "code.laria.me/petrific/storage/cloud"
+)
+
+// List af all available storage types
+var StorageTypes = map[string]storage.CreateStorageFromConfig{
+ "local": storage.LocalStorageFromConfig,
+ "memory": storage.MemoryStorageFromConfig,
+ "openstack-swift": cloud.SwiftStorageCreator(),
+}
diff --git a/storage/storage.go b/storage/storage.go
index 7d09a70..176e75d 100644
--- a/storage/storage.go
+++ b/storage/storage.go
@@ -25,11 +25,6 @@ type Storage interface {
type CreateStorageFromConfig func(conf config.Config, name string) (Storage, error)
-var StorageTypes = map[string]CreateStorageFromConfig{
- "local": LocalStorageFromConfig,
- "memory": MemoryStorageFromConfig,
-}
-
func SetObject(s Storage, o objects.RawObject) (id objects.ObjectId, err error) {
buf := new(bytes.Buffer)