X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/88a29cd091468feb98e5cd541c560f4d35bca716..051ad2017b69ca8e438396b461525e485a896321:/services/keepstore/azure_blob_volume.go diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index f18d82c06b..5da2055b77 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -18,6 +18,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -452,7 +453,7 @@ func (v *AzureBlobVolume) Touch(loc string) error { return os.ErrNotExist } - metadata["touch"] = fmt.Sprintf("%d", time.Now()) + metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix()) return v.container.SetBlobMetadata(loc, metadata, nil) } @@ -620,49 +621,67 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool { // and deletes them from the volume. func (v *AzureBlobVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 - var blocksDeleted, blocksInTrash int - params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}} + var blocksDeleted, blocksInTrash int64 - for { - resp, err := v.container.ListBlobs(params) + doBlob := func(b storage.Blob) { + // Check whether the block is flagged as trash + if b.Metadata["expires_at"] == "" { + return + } + + atomic.AddInt64(&blocksInTrash, 1) + atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength) + + expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64) if err != nil { - log.Printf("EmptyTrash: ListBlobs: %v", err) - break + log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err) + return } - for _, b := range resp.Blobs { - // Check if the block is expired - if b.Metadata["expires_at"] == "" { - continue - } - blocksInTrash++ - bytesInTrash += b.Properties.ContentLength + if expiresAt > time.Now().Unix() { + return + } - expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64) - if err != nil { - log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err) - continue - } + err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{ + IfMatch: b.Properties.Etag, + }) + if err != nil { + log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err) + return + } + atomic.AddInt64(&blocksDeleted, 1) + atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength) + } - if expiresAt > time.Now().Unix() { - continue + var wg sync.WaitGroup + todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers) + for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for b := range todo { + doBlob(b) } + }() + } - err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{ - IfMatch: b.Properties.Etag, - }) - if err != nil { - log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err) - continue - } - blocksDeleted++ - bytesDeleted += b.Properties.ContentLength + params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}} + for { + resp, err := v.container.ListBlobs(params) + if err != nil { + log.Printf("EmptyTrash: ListBlobs: %v", err) + break + } + for _, b := range resp.Blobs { + todo <- b } if resp.NextMarker == "" { break } params.Marker = resp.NextMarker } + close(todo) + wg.Wait() log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) }