Merge branch 'master' into 13822-nm-delayed-daemon
[arvados.git] / services / keepstore / azure_blob_volume.go
index f470034ba689f35441980c96ee9dac75c149794a..5da2055b7736d117f6a7015a8486a948ee80a4d7 100644 (file)
@@ -18,10 +18,11 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/curoverse/azure-sdk-for-go/storage"
+       "github.com/Azure/azure-sdk-for-go/storage"
 )
 
 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
@@ -105,6 +106,7 @@ type AzureBlobVolume struct {
        AzureReplication      int
        ReadOnly              bool
        RequestTimeout        arvados.Duration
+       StorageClasses        []string
 
        azClient  storage.Client
        container *azureContainer
@@ -451,7 +453,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
                return os.ErrNotExist
        }
 
-       metadata["touch"] = fmt.Sprintf("%d", time.Now())
+       metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
        return v.container.SetBlobMetadata(loc, metadata, nil)
 }
 
@@ -590,6 +592,11 @@ func (v *AzureBlobVolume) Replication() int {
        return v.AzureReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // If possible, translate an Azure SDK error to a recognizable error
 // like os.ErrNotExist.
 func (v *AzureBlobVolume) translateError(err error) error {
@@ -614,49 +621,67 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
-       var blocksDeleted, blocksInTrash int
-       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+       var blocksDeleted, blocksInTrash int64
 
-       for {
-               resp, err := v.container.ListBlobs(params)
+       doBlob := func(b storage.Blob) {
+               // Check whether the block is flagged as trash
+               if b.Metadata["expires_at"] == "" {
+                       return
+               }
+
+               atomic.AddInt64(&blocksInTrash, 1)
+               atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
+
+               expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
                if err != nil {
-                       log.Printf("EmptyTrash: ListBlobs: %v", err)
-                       break
+                       log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+                       return
                }
-               for _, b := range resp.Blobs {
-                       // Check if the block is expired
-                       if b.Metadata["expires_at"] == "" {
-                               continue
-                       }
 
-                       blocksInTrash++
-                       bytesInTrash += b.Properties.ContentLength
+               if expiresAt > time.Now().Unix() {
+                       return
+               }
 
-                       expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
-                       if err != nil {
-                               log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
-                               continue
-                       }
+               err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+                       IfMatch: b.Properties.Etag,
+               })
+               if err != nil {
+                       log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+                       return
+               }
+               atomic.AddInt64(&blocksDeleted, 1)
+               atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
+       }
 
-                       if expiresAt > time.Now().Unix() {
-                               continue
+       var wg sync.WaitGroup
+       todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for b := range todo {
+                               doBlob(b)
                        }
+               }()
+       }
 
-                       err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
-                               IfMatch: b.Properties.Etag,
-                       })
-                       if err != nil {
-                               log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
-                               continue
-                       }
-                       blocksDeleted++
-                       bytesDeleted += b.Properties.ContentLength
+       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+       for {
+               resp, err := v.container.ListBlobs(params)
+               if err != nil {
+                       log.Printf("EmptyTrash: ListBlobs: %v", err)
+                       break
+               }
+               for _, b := range resp.Blobs {
+                       todo <- b
                }
                if resp.NextMarker == "" {
                        break
                }
                params.Marker = resp.NextMarker
        }
+       close(todo)
+       wg.Wait()
 
        log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }