13383: Implement EmptyTrashWorkers in azure and filesystem backends.
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 4 May 2018 20:48:50 +0000 (16:48 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 4 May 2018 20:52:05 +0000 (16:52 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

services/keepstore/azure_blob_volume.go
services/keepstore/s3_volume.go
services/keepstore/usage.go
services/keepstore/volume_unix.go

index 828a1f1b7a485b4353f32ace30de5c8cf9a40192..5da2055b7736d117f6a7015a8486a948ee80a4d7 100644 (file)
@@ -18,6 +18,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -620,49 +621,67 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
-       var blocksDeleted, blocksInTrash int
-       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+       var blocksDeleted, blocksInTrash int64
 
-       for {
-               resp, err := v.container.ListBlobs(params)
+       doBlob := func(b storage.Blob) {
+               // Check whether the block is flagged as trash
+               if b.Metadata["expires_at"] == "" {
+                       return
+               }
+
+               atomic.AddInt64(&blocksInTrash, 1)
+               atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
+
+               expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
                if err != nil {
-                       log.Printf("EmptyTrash: ListBlobs: %v", err)
-                       break
+                       log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+                       return
                }
-               for _, b := range resp.Blobs {
-                       // Check if the block is expired
-                       if b.Metadata["expires_at"] == "" {
-                               continue
-                       }
 
-                       blocksInTrash++
-                       bytesInTrash += b.Properties.ContentLength
+               if expiresAt > time.Now().Unix() {
+                       return
+               }
 
-                       expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
-                       if err != nil {
-                               log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
-                               continue
-                       }
+               err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+                       IfMatch: b.Properties.Etag,
+               })
+               if err != nil {
+                       log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+                       return
+               }
+               atomic.AddInt64(&blocksDeleted, 1)
+               atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
+       }
 
-                       if expiresAt > time.Now().Unix() {
-                               continue
+       var wg sync.WaitGroup
+       todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for b := range todo {
+                               doBlob(b)
                        }
+               }()
+       }
 
-                       err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
-                               IfMatch: b.Properties.Etag,
-                       })
-                       if err != nil {
-                               log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
-                               continue
-                       }
-                       blocksDeleted++
-                       bytesDeleted += b.Properties.ContentLength
+       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+       for {
+               resp, err := v.container.ListBlobs(params)
+               if err != nil {
+                       log.Printf("EmptyTrash: ListBlobs: %v", err)
+                       break
+               }
+               for _, b := range resp.Blobs {
+                       todo <- b
                }
                if resp.NextMarker == "" {
                        break
                }
                params.Marker = resp.NextMarker
        }
+       close(todo)
+       wg.Wait()
 
        log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
index b5a1c97b48c1b8269ae6384c1716d93cd129500c..9d4d8019282ebf01160544d940345b36fe892076 100644 (file)
@@ -850,7 +850,7 @@ func (v *S3Volume) EmptyTrash() {
        }
 
        var wg sync.WaitGroup
-       todo := make(chan *s3.Key)
+       todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
        for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
                wg.Add(1)
                go func() {
index 5f6fd90a1cda96bd41f0039d9b5be491c39a050b..672d7cf1694e78df6e5269dfdab58a5cddbb8139 100644 (file)
@@ -118,6 +118,21 @@ TrashCheckInterval:
     How often to check for (and delete) trashed blocks whose
     TrashLifetime has expired.
 
+TrashWorkers:
+
+    Maximum number of concurrent trash operations. Default is 1, i.e.,
+    trash lists are processed serially.
+
+EmptyTrashWorkers:
+
+    Maximum number of concurrent block deletion operations (per
+    volume) when emptying trash. Default is 1.
+
+PullWorkers:
+
+    Maximum number of concurrent pull operations. Default is 1, i.e.,
+    pull lists are processed serially.
+
 Volumes:
 
     List of storage volumes. If omitted or empty, the default is to
index 5a04ffd944c17ab51de93a41fd1d6994fff1ecbe..23d675359244942097072d88e1bd98daf9d46c6c 100644 (file)
@@ -18,6 +18,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "syscall"
        "time"
 )
@@ -725,39 +726,61 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
-       var blocksDeleted, blocksInTrash int
+       var blocksDeleted, blocksInTrash int64
 
-       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
-               if err != nil {
-                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
-                       return nil
-               }
+       doFile := func(path string, info os.FileInfo) {
                if info.Mode().IsDir() {
-                       return nil
+                       return
                }
                matches := unixTrashLocRegexp.FindStringSubmatch(path)
                if len(matches) != 3 {
-                       return nil
+                       return
                }
                deadline, err := strconv.ParseInt(matches[2], 10, 64)
                if err != nil {
                        log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
-                       return nil
+                       return
                }
-               bytesInTrash += info.Size()
-               blocksInTrash++
+               atomic.AddInt64(&bytesInTrash, info.Size())
+               atomic.AddInt64(&blocksInTrash, 1)
                if deadline > time.Now().Unix() {
-                       return nil
+                       return
                }
                err = v.os.Remove(path)
                if err != nil {
                        log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       return
+               }
+               atomic.AddInt64(&bytesDeleted, info.Size())
+               atomic.AddInt64(&blocksDeleted, 1)
+       }
+
+       type dirent struct {
+               path string
+               info os.FileInfo
+       }
+       var wg sync.WaitGroup
+       todo := make(chan dirent, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for e := range todo {
+                               doFile(e.path, e.info)
+                       }
+               }()
+       }
+
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
                        return nil
                }
-               bytesDeleted += info.Size()
-               blocksDeleted++
+               todo <- dirent{path, info}
                return nil
        })
+       close(todo)
+       wg.Wait()
 
        if err != nil {
                log.Printf("EmptyTrash error for %v: %v", v.String(), err)