8556: implement trash/untrash for azure volumes.
authorradhika <radhika@curoverse.com>
Tue, 24 May 2016 16:39:28 +0000 (12:39 -0400)
committerradhika <radhika@curoverse.com>
Tue, 24 May 2016 16:39:28 +0000 (12:39 -0400)
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/s3_volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_unix.go

index 5bd7f102556a8f2b532dc0c0e74790cb00d60489..99da2a3a3de35de90be820b1a5285e5b592004d7 100644 (file)
@@ -10,6 +10,7 @@ import (
        "log"
        "os"
        "regexp"
+       "strconv"
        "strings"
        "sync"
        "time"
@@ -133,6 +134,18 @@ func (v *AzureBlobVolume) Check() error {
        return nil
 }
 
+// Return true if expires_at metadata attribute is found on the block
+func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
+       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       if err != nil {
+               return false, metadata, v.translateError(err)
+       }
+       if metadata["expires_at"] != "" {
+               return true, metadata, nil
+       }
+       return false, metadata, nil
+}
+
 // Get reads a Keep block that has been stored as a block blob in the
 // container.
 //
@@ -140,6 +153,13 @@ func (v *AzureBlobVolume) Check() error {
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
 func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+       trashed, _, err := v.checkTrashed(loc)
+       if err != nil {
+               return 0, err
+       }
+       if trashed {
+               return 0, os.ErrNotExist
+       }
        var deadline time.Time
        haveDeadline := false
        size, err := v.get(loc, buf)
@@ -241,6 +261,13 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
 
 // Compare the given data with existing stored data.
 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+       trashed, _, err := v.checkTrashed(loc)
+       if err != nil {
+               return err
+       }
+       if trashed {
+               return os.ErrNotExist
+       }
        rdr, err := v.bsClient.GetBlob(v.containerName, loc)
        if err != nil {
                return v.translateError(err)
@@ -262,13 +289,28 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
-       return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
-               "touch": fmt.Sprintf("%d", time.Now()),
-       }, nil)
+       trashed, metadata, err := v.checkTrashed(loc)
+       if err != nil {
+               return err
+       }
+       if trashed {
+               return os.ErrNotExist
+       }
+
+       metadata["touch"] = fmt.Sprintf("%d", time.Now())
+       return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
 }
 
 // Mtime returns the last-modified property of a block blob.
 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+       trashed, _, err := v.checkTrashed(loc)
+       if err != nil {
+               return time.Time{}, err
+       }
+       if trashed {
+               return time.Time{}, os.ErrNotExist
+       }
+
        props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
        if err != nil {
                return time.Time{}, err
@@ -280,7 +322,8 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
 // container.
 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        params := storage.ListBlobsParameters{
-               Prefix: prefix,
+               Prefix:  prefix,
+               Include: "metadata",
        }
        for {
                resp, err := v.bsClient.ListBlobs(v.containerName, params)
@@ -303,6 +346,10 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                                // value.
                                continue
                        }
+                       if b.Metadata["expires_at"] != "" {
+                               // Trashed blob; exclude it from response
+                               continue
+                       }
                        fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
                }
                if resp.NextMarker == "" {
@@ -318,10 +365,6 @@ func (v *AzureBlobVolume) Trash(loc string) error {
                return MethodDisabledError
        }
 
-       if trashLifetime != 0 {
-               return ErrNotImplemented
-       }
-
        // Ideally we would use If-Unmodified-Since, but that
        // particular condition seems to be ignored by Azure. Instead,
        // we get the Etag before checking Mtime, and use If-Match to
@@ -336,15 +379,38 @@ func (v *AzureBlobVolume) Trash(loc string) error {
        } else if time.Since(t) < blobSignatureTTL {
                return nil
        }
-       return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+
+       // If trashLifetime == 0, just delete it
+       if trashLifetime == 0 {
+               return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+                       "If-Match": props.Etag,
+               })
+       }
+
+       // Otherwise, mark as trash
+       return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
+               "expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+       }, map[string]string{
                "If-Match": props.Etag,
        })
 }
 
 // Untrash a Keep block.
-// TBD
+// Delete the expires_at metadata attribute
 func (v *AzureBlobVolume) Untrash(loc string) error {
-       return ErrNotImplemented
+       // if expires_at does not exist, return NotFoundError
+       metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+       if err != nil {
+               return v.translateError(err)
+       }
+       if metadata["expires_at"] == "" {
+               return os.ErrNotExist
+       }
+
+       // reset expires_at metadata attribute
+       metadata["expires_at"] = ""
+       err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+       return v.translateError(err)
 }
 
 // Status returns a VolumeStatus struct with placeholder data.
@@ -379,7 +445,7 @@ func (v *AzureBlobVolume) translateError(err error) error {
        switch {
        case err == nil:
                return err
-       case strings.Contains(err.Error(), "404 Not Found"):
+       case strings.Contains(err.Error(), "Not Found"):
                // "storage: service returned without a response body (404 Not Found)"
                return os.ErrNotExist
        default:
@@ -395,6 +461,51 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
-// TBD
 func (v *AzureBlobVolume) EmptyTrash() {
+       var bytesDeleted, bytesInTrash int64
+       var blocksDeleted, blocksInTrash int
+       params := storage.ListBlobsParameters{Include: "metadata"}
+
+       for {
+               resp, err := v.bsClient.ListBlobs(v.containerName, params)
+               if err != nil {
+                       log.Printf("EmptyTrash: ListBlobs: %v", err)
+                       break
+               }
+               for _, b := range resp.Blobs {
+                       // Check if the block is expired
+                       if b.Metadata["expires_at"] == "" {
+                               continue
+                       }
+
+                       blocksInTrash++
+                       bytesInTrash += b.Properties.ContentLength
+
+                       expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
+                       if err != nil {
+                               log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+                               continue
+                       }
+
+                       if expiresAt > time.Now().Unix() {
+                               continue
+                       }
+
+                       err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+                               "If-Match": b.Properties.Etag,
+                       })
+                       if err != nil {
+                               log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+                               continue
+                       }
+                       blocksDeleted++
+                       bytesDeleted += b.Properties.ContentLength
+               }
+               if resp.NextMarker == "" {
+                       break
+               }
+               params.Marker = resp.NextMarker
+       }
+
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
index e3c0e27083245f2d7cbead33f915f9365774cc02..5d556b3e8c40eb242addf53f4996c49eb396138f 100644 (file)
@@ -74,6 +74,7 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
        h.blobs[container+"|"+hash] = &azBlob{
                Data:        data,
                Mtime:       time.Now(),
+               Metadata:    make(map[string]string),
                Uncommitted: make(map[string][]byte),
        }
 }
@@ -136,14 +137,23 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                        h.blobs[container+"|"+hash] = &azBlob{
                                Mtime:       time.Now(),
                                Uncommitted: make(map[string][]byte),
+                               Metadata:    make(map[string]string),
                                Etag:        makeEtag(),
                        }
                        h.unlockAndRace()
                }
+               metadata := make(map[string]string)
+               for k, v := range r.Header {
+                       if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+                               name := k[len("x-ms-meta-"):]
+                               metadata[strings.ToLower(name)] = v[0]
+                       }
+               }
                h.blobs[container+"|"+hash] = &azBlob{
                        Data:        body,
                        Mtime:       time.Now(),
                        Uncommitted: make(map[string][]byte),
+                       Metadata:    metadata,
                        Etag:        makeEtag(),
                }
                rw.WriteHeader(http.StatusCreated)
@@ -196,11 +206,22 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                blob.Metadata = make(map[string]string)
                for k, v := range r.Header {
                        if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
-                               blob.Metadata[k] = v[0]
+                               name := k[len("x-ms-meta-"):]
+                               blob.Metadata[strings.ToLower(name)] = v[0]
                        }
                }
                blob.Mtime = time.Now()
                blob.Etag = makeEtag()
+       case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
+               // "Get Blob Metadata" API
+               if !blobExists {
+                       rw.WriteHeader(http.StatusNotFound)
+                       return
+               }
+               for k, v := range blob.Metadata {
+                       rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
+               }
+               return
        case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
                // "Get Blob" API
                if !blobExists {
@@ -265,14 +286,20 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                        }
                        if len(resp.Blobs) > 0 || marker == "" || marker == hash {
                                blob := h.blobs[container+"|"+hash]
-                               resp.Blobs = append(resp.Blobs, storage.Blob{
+                               bmeta := map[string]string(nil)
+                               if r.Form.Get("include") == "metadata" {
+                                       bmeta = blob.Metadata
+                               }
+                               b := storage.Blob{
                                        Name: hash,
                                        Properties: storage.BlobProperties{
                                                LastModified:  blob.Mtime.Format(time.RFC1123),
                                                ContentLength: int64(len(blob.Data)),
                                                Etag:          blob.Etag,
                                        },
-                               })
+                                       Metadata: bmeta,
+                               }
+                               resp.Blobs = append(resp.Blobs, b)
                        }
                }
                buf, err := xml.Marshal(resp)
index d068b2a6e5da0601eb7f9bb4c6a0ec8e4b891774..80a7c89f2ed4f6669566711c40c4d0a59940e439 100644 (file)
@@ -10,6 +10,7 @@ import (
        "net/http"
        "os"
        "regexp"
+       "strings"
        "time"
 
        "github.com/AdRoll/goamz/aws"
@@ -310,7 +311,8 @@ func (v *S3Volume) isKeepBlock(s string) bool {
 func (v *S3Volume) translateError(err error) error {
        switch err := err.(type) {
        case *s3.Error:
-               if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
+               if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
+                       strings.Contains(err.Error(), "Not Found") {
                        return os.ErrNotExist
                }
                // Other 404 errors like NoSuchVersion and
index 105795c146e2d932f3066ee2f26948dd639d9436..f8fe0d0ebce719c6c823fe9caa9fcce12324eb49 100644 (file)
@@ -453,6 +453,27 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
                t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
+
+       _, err := v.Mtime(TestHash)
+       if err == nil || !os.IsNotExist(err) {
+               t.Fatalf("os.IsNotExist(%v) should have been true", err)
+       }
+
+       err = v.Compare(TestHash, TestBlock)
+       if err == nil || !os.IsNotExist(err) {
+               t.Fatalf("os.IsNotExist(%v) should have been true", err)
+       }
+
+       indexBuf := new(bytes.Buffer)
+       v.IndexTo("", indexBuf)
+       if strings.Contains(string(indexBuf.Bytes()), TestHash) {
+               t.Fatalf("Found trashed block in IndexTo")
+       }
+
+       err = v.Touch(TestHash)
+       if err == nil || !os.IsNotExist(err) {
+               t.Fatalf("os.IsNotExist(%v) should have been true", err)
+       }
 }
 
 // Calling Delete() for a block that does not exist should result in error.
@@ -723,11 +744,11 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        err = v.Trash(TestHash)
        if v.Writable() == false {
                if err != MethodDisabledError {
-                       t.Error(err)
+                       t.Fatal(err)
                }
        } else if err != nil {
                if err != ErrNotImplemented {
-                       t.Error(err)
+                       t.Fatal(err)
                }
        } else {
                _, err = v.Get(TestHash, buf)
@@ -768,6 +789,23 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
                if bytes.Compare(buf[:n], TestBlock) != 0 {
                        t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
                }
+
+               _, err = v.Mtime(TestHash)
+               if err != nil {
+                       return err
+               }
+
+               err = v.Compare(TestHash, TestBlock)
+               if err != nil {
+                       return err
+               }
+
+               indexBuf := new(bytes.Buffer)
+               v.IndexTo("", indexBuf)
+               if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
+                       return os.ErrNotExist
+               }
+
                return nil
        }
 
@@ -783,6 +821,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
                t.Fatal(err)
        }
 
+       // Trash the block
        err = v.Trash(TestHash)
        if err == MethodDisabledError || err == ErrNotImplemented {
                // Skip the trash tests for read-only volumes, and
@@ -795,6 +834,11 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
                t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
 
+       err = v.Touch(TestHash)
+       if err == nil || !os.IsNotExist(err) {
+               t.Fatalf("os.IsNotExist(%v) should have been true", err)
+       }
+
        v.EmptyTrash()
 
        // Even after emptying the trash, we can untrash our block
@@ -803,11 +847,20 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        if err != nil {
                t.Fatal(err)
        }
+
        err = checkGet()
        if err != nil {
                t.Fatal(err)
        }
 
+       err = v.Touch(TestHash)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       // Because we Touch'ed, need to backdate again for next set of tests
+       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
        // Untrash should fail if the only block in the trash has
        // already been untrashed.
        err = v.Untrash(TestHash)
@@ -848,11 +901,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
        // Trash it again, and this time call EmptyTrash so it really
        // goes away.
+       // (In Azure volumes, un/trash changes Mtime, so first backdate again)
+       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
        err = v.Trash(TestHash)
        err = checkGet()
        if err == nil || !os.IsNotExist(err) {
-               t.Errorf("os.IsNotExist(%v) should have been true", err)
+               t.Fatalf("os.IsNotExist(%v) should have been true", err)
        }
+       // EmptryTrash
        v.EmptyTrash()
 
        // Untrash won't find it
index edec048dfe5836701f821beefdc3d7b131777982..7aff85e59a4357acb1e27ce5386756feb96fa0e1 100644 (file)
@@ -538,7 +538,7 @@ func (v *UnixVolume) translateError(err error) error {
        }
 }
 
-var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
@@ -554,7 +554,7 @@ func (v *UnixVolume) EmptyTrash() {
                if info.Mode().IsDir() {
                        return nil
                }
-               matches := trashLocRegexp.FindStringSubmatch(path)
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
                if len(matches) != 3 {
                        return nil
                }